use bitflags::bitflags;
use valkey_module_macros_internals::api;
use std::collections::{BTreeMap, HashMap};
use std::ffi::CString;
use std::os::raw::c_void;
use std::os::raw::{c_char, c_int, c_long, c_longlong};
use std::ptr::{self, NonNull};
use std::sync::atomic::{AtomicPtr, Ordering};
use crate::key::{KeyFlags, ValkeyKey, ValkeyKeyWritable};
use crate::logging::ValkeyLogLevel;
use crate::raw::{ModuleOptions, Version};
use crate::redisvalue::ValkeyValueKey;
use crate::{
add_info_begin_dict_field, add_info_end_dict_field, add_info_field_double,
add_info_field_long_long, add_info_field_str, add_info_field_unsigned_long_long, raw, utils,
Status,
};
use crate::{add_info_section, ValkeyResult};
use crate::{ValkeyError, ValkeyString, ValkeyValue};
use std::ops::Deref;
use std::ffi::CStr;
use self::call_reply::{create_promise_call_reply, CallResult, PromiseCallReply};
use self::thread_safe::ValkeyLockIndicator;
mod timer;
pub mod blocked;
pub mod call_reply;
pub mod commands;
pub mod info;
pub mod keys_cursor;
pub mod server_events;
pub mod thread_safe;
pub struct CallOptionsBuilder {
options: String,
}
impl Default for CallOptionsBuilder {
fn default() -> Self {
CallOptionsBuilder {
options: "v".to_string(),
}
}
}
#[derive(Clone)]
pub struct CallOptions {
options: CString,
}
#[derive(Clone)]
#[cfg(feature = "min-redis-compatibility-version-7-2")]
pub struct BlockingCallOptions {
options: CString,
}
#[derive(Copy, Clone)]
pub enum CallOptionResp {
Resp2,
Resp3,
Auto,
}
impl CallOptionsBuilder {
pub fn new() -> CallOptionsBuilder {
Self::default()
}
fn add_flag(&mut self, flag: &str) {
self.options.push_str(flag);
}
pub fn no_writes(mut self) -> CallOptionsBuilder {
self.add_flag("W");
self
}
pub fn script_mode(mut self) -> CallOptionsBuilder {
self.add_flag("S");
self
}
pub fn verify_acl(mut self) -> CallOptionsBuilder {
self.add_flag("C");
self
}
pub fn verify_oom(mut self) -> CallOptionsBuilder {
self.add_flag("M");
self
}
pub fn errors_as_replies(mut self) -> CallOptionsBuilder {
self.add_flag("E");
self
}
pub fn replicate(mut self) -> CallOptionsBuilder {
self.add_flag("!");
self
}
pub fn resp(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
match resp {
CallOptionResp::Auto => self.add_flag("0"),
CallOptionResp::Resp2 => (),
CallOptionResp::Resp3 => self.add_flag("3"),
}
self
}
pub fn build(self) -> CallOptions {
CallOptions {
options: CString::new(self.options).unwrap(), }
}
#[cfg(feature = "min-redis-compatibility-version-7-2")]
pub fn build_blocking(mut self) -> BlockingCallOptions {
self.add_flag("K");
BlockingCallOptions {
options: CString::new(self.options).unwrap(), }
}
}
pub struct DetachedContext {
pub(crate) ctx: AtomicPtr<raw::RedisModuleCtx>,
}
impl DetachedContext {
pub const fn new() -> Self {
DetachedContext {
ctx: AtomicPtr::new(ptr::null_mut()),
}
}
}
impl Default for DetachedContext {
fn default() -> Self {
Self::new()
}
}
pub struct DetachedContextGuard {
pub(crate) ctx: Context,
}
unsafe impl ValkeyLockIndicator for DetachedContextGuard {}
impl Drop for DetachedContextGuard {
fn drop(&mut self) {
unsafe {
raw::RedisModule_ThreadSafeContextUnlock.unwrap()(self.ctx.ctx);
};
}
}
impl Deref for DetachedContextGuard {
type Target = Context;
fn deref(&self) -> &Self::Target {
&self.ctx
}
}
impl DetachedContext {
pub fn log(&self, level: ValkeyLogLevel, message: &str) {
let c = self.ctx.load(Ordering::Relaxed);
crate::logging::log_internal(c, level, message);
}
pub fn log_debug(&self, message: &str) {
self.log(ValkeyLogLevel::Debug, message);
}
pub fn log_notice(&self, message: &str) {
self.log(ValkeyLogLevel::Notice, message);
}
pub fn log_verbose(&self, message: &str) {
self.log(ValkeyLogLevel::Verbose, message);
}
pub fn log_warning(&self, message: &str) {
self.log(ValkeyLogLevel::Warning, message);
}
pub fn set_context(&self, ctx: &Context) -> Result<(), ValkeyError> {
let c = self.ctx.load(Ordering::Relaxed);
if !c.is_null() {
return Err(ValkeyError::Str("Detached context is already set"));
}
let ctx = unsafe { raw::RedisModule_GetDetachedThreadSafeContext.unwrap()(ctx.ctx) };
self.ctx.store(ctx, Ordering::Relaxed);
Ok(())
}
pub fn lock(&self) -> DetachedContextGuard {
let c = self.ctx.load(Ordering::Relaxed);
unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(c) };
let ctx = Context::new(c);
DetachedContextGuard { ctx }
}
}
unsafe impl Send for DetachedContext {}
unsafe impl Sync for DetachedContext {}
#[derive(Debug)]
pub struct Context {
pub ctx: *mut raw::RedisModuleCtx,
}
#[derive(Debug)]
pub struct ContextUserScope<'ctx> {
ctx: &'ctx Context,
user: *mut raw::RedisModuleUser,
}
impl<'ctx> Drop for ContextUserScope<'ctx> {
fn drop(&mut self) {
self.ctx.deautenticate_user();
unsafe { raw::RedisModule_FreeModuleUser.unwrap()(self.user) };
}
}
impl<'ctx> ContextUserScope<'ctx> {
fn new(ctx: &'ctx Context, user: *mut raw::RedisModuleUser) -> ContextUserScope<'ctx> {
ContextUserScope { ctx, user }
}
}
pub struct StrCallArgs<'a> {
is_owner: bool,
args: Vec<*mut raw::RedisModuleString>,
phantom: std::marker::PhantomData<&'a raw::RedisModuleString>,
}
impl<'a> Drop for StrCallArgs<'a> {
fn drop(&mut self) {
if self.is_owner {
self.args.iter_mut().for_each(|v| unsafe {
raw::RedisModule_FreeString.unwrap()(std::ptr::null_mut(), *v)
});
}
}
}
impl<'a, T: AsRef<[u8]> + ?Sized> From<&'a [&T]> for StrCallArgs<'a> {
fn from(vals: &'a [&T]) -> Self {
StrCallArgs {
is_owner: true,
args: vals
.iter()
.map(|v| ValkeyString::create_from_slice(std::ptr::null_mut(), v.as_ref()).take())
.collect(),
phantom: std::marker::PhantomData,
}
}
}
impl<'a> From<&'a [&ValkeyString]> for StrCallArgs<'a> {
fn from(vals: &'a [&ValkeyString]) -> Self {
StrCallArgs {
is_owner: false,
args: vals.iter().map(|v| v.inner).collect(),
phantom: std::marker::PhantomData,
}
}
}
impl<'a, const SIZE: usize, T: ?Sized> From<&'a [&T; SIZE]> for StrCallArgs<'a>
where
for<'b> &'a [&'b T]: Into<StrCallArgs<'a>>,
{
fn from(vals: &'a [&T; SIZE]) -> Self {
vals.as_ref().into()
}
}
impl<'a> StrCallArgs<'a> {
pub(crate) fn args_mut(&mut self) -> &mut [*mut raw::RedisModuleString] {
&mut self.args
}
}
impl Context {
pub const fn new(ctx: *mut raw::RedisModuleCtx) -> Self {
Self { ctx }
}
#[must_use]
pub const fn dummy() -> Self {
Self {
ctx: ptr::null_mut(),
}
}
pub fn log(&self, level: ValkeyLogLevel, message: &str) {
crate::logging::log_internal(self.ctx, level, message);
}
pub fn log_debug(&self, message: &str) {
self.log(ValkeyLogLevel::Debug, message);
}
pub fn log_notice(&self, message: &str) {
self.log(ValkeyLogLevel::Notice, message);
}
pub fn log_verbose(&self, message: &str) {
self.log(ValkeyLogLevel::Verbose, message);
}
pub fn log_warning(&self, message: &str) {
self.log(ValkeyLogLevel::Warning, message);
}
pub fn auto_memory(&self) {
unsafe {
raw::RedisModule_AutoMemory.unwrap()(self.ctx);
}
}
#[must_use]
pub fn is_keys_position_request(&self) -> bool {
if cfg!(test) {
return false;
}
(unsafe { raw::RedisModule_IsKeysPositionRequest.unwrap()(self.ctx) }) != 0
}
pub fn key_at_pos(&self, pos: i32) {
unsafe {
raw::RedisModule_KeyAtPos.unwrap()(self.ctx, pos as c_int);
}
}
fn call_internal<
'ctx,
'a,
T: Into<StrCallArgs<'a>>,
R: From<PromiseCallReply<'static, 'ctx>>,
>(
&'ctx self,
command: &str,
fmt: *const c_char,
args: T,
) -> R {
let mut call_args: StrCallArgs = args.into();
let final_args = call_args.args_mut();
let cmd = CString::new(command).unwrap();
let reply: *mut raw::RedisModuleCallReply = unsafe {
let p_call = raw::RedisModule_Call.unwrap();
p_call(
self.ctx,
cmd.as_ptr(),
fmt,
final_args.as_mut_ptr(),
final_args.len(),
)
};
let promise = create_promise_call_reply(self, NonNull::new(reply));
R::from(promise)
}
pub fn call<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) -> ValkeyResult {
self.call_internal::<_, CallResult>(command, raw::FMT, args)
.map_or_else(|e| Err(e.into()), |v| Ok((&v).into()))
}
pub fn call_ext<'a, T: Into<StrCallArgs<'a>>, R: From<CallResult<'static>>>(
&self,
command: &str,
options: &CallOptions,
args: T,
) -> R {
let res: CallResult<'static> =
self.call_internal(command, options.options.as_ptr() as *const c_char, args);
R::from(res)
}
#[cfg(feature = "min-redis-compatibility-version-7-2")]
pub fn call_blocking<
'ctx,
'a,
T: Into<StrCallArgs<'a>>,
R: From<PromiseCallReply<'static, 'ctx>>,
>(
&'ctx self,
command: &str,
options: &BlockingCallOptions,
args: T,
) -> R {
self.call_internal(command, options.options.as_ptr() as *const c_char, args)
}
#[must_use]
pub fn str_as_legal_resp_string(s: &str) -> CString {
CString::new(
s.chars()
.map(|c| match c {
'\r' | '\n' | '\0' => b' ',
_ => c as u8,
})
.collect::<Vec<_>>(),
)
.unwrap()
}
#[allow(clippy::must_use_candidate)]
pub fn reply_simple_string(&self, s: &str) -> raw::Status {
let msg = Self::str_as_legal_resp_string(s);
raw::reply_with_simple_string(self.ctx, msg.as_ptr())
}
#[allow(clippy::must_use_candidate)]
pub fn reply_error_string(&self, s: &str) -> raw::Status {
let msg = Self::str_as_legal_resp_string(s);
unsafe { raw::RedisModule_ReplyWithError.unwrap()(self.ctx, msg.as_ptr()).into() }
}
pub fn reply_with_key(&self, result: ValkeyValueKey) -> raw::Status {
match result {
ValkeyValueKey::Integer(i) => raw::reply_with_long_long(self.ctx, i),
ValkeyValueKey::String(s) => {
raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
}
ValkeyValueKey::BulkString(b) => {
raw::reply_with_string_buffer(self.ctx, b.as_ptr().cast::<c_char>(), b.len())
}
ValkeyValueKey::BulkValkeyString(s) => raw::reply_with_string(self.ctx, s.inner),
ValkeyValueKey::Bool(b) => raw::reply_with_bool(self.ctx, b.into()),
}
}
#[allow(clippy::must_use_candidate)]
pub fn reply(&self, result: ValkeyResult) -> raw::Status {
match result {
Ok(ValkeyValue::Bool(v)) => raw::reply_with_bool(self.ctx, v.into()),
Ok(ValkeyValue::Integer(v)) => raw::reply_with_long_long(self.ctx, v),
Ok(ValkeyValue::Float(v)) => raw::reply_with_double(self.ctx, v),
Ok(ValkeyValue::SimpleStringStatic(s)) => {
let msg = CString::new(s).unwrap();
raw::reply_with_simple_string(self.ctx, msg.as_ptr())
}
Ok(ValkeyValue::SimpleString(s)) => {
let msg = CString::new(s).unwrap();
raw::reply_with_simple_string(self.ctx, msg.as_ptr())
}
Ok(ValkeyValue::BulkString(s)) => {
raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
}
Ok(ValkeyValue::BigNumber(s)) => {
raw::reply_with_big_number(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
}
Ok(ValkeyValue::VerbatimString((format, data))) => raw::reply_with_verbatim_string(
self.ctx,
data.as_ptr().cast(),
data.len(),
format.0.as_ptr().cast(),
),
Ok(ValkeyValue::BulkValkeyString(s)) => raw::reply_with_string(self.ctx, s.inner),
Ok(ValkeyValue::StringBuffer(s)) => {
raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
}
Ok(ValkeyValue::Array(array)) => {
raw::reply_with_array(self.ctx, array.len() as c_long);
for elem in array {
self.reply(Ok(elem));
}
raw::Status::Ok
}
Ok(ValkeyValue::Map(map)) => {
raw::reply_with_map(self.ctx, map.len() as c_long);
for (key, value) in map {
self.reply_with_key(key);
self.reply(Ok(value));
}
raw::Status::Ok
}
Ok(ValkeyValue::OrderedMap(map)) => {
raw::reply_with_map(self.ctx, map.len() as c_long);
for (key, value) in map {
self.reply_with_key(key);
self.reply(Ok(value));
}
raw::Status::Ok
}
Ok(ValkeyValue::Set(set)) => {
raw::reply_with_set(self.ctx, set.len() as c_long);
set.into_iter().for_each(|e| {
self.reply_with_key(e);
});
raw::Status::Ok
}
Ok(ValkeyValue::OrderedSet(set)) => {
raw::reply_with_set(self.ctx, set.len() as c_long);
set.into_iter().for_each(|e| {
self.reply_with_key(e);
});
raw::Status::Ok
}
Ok(ValkeyValue::Null) => raw::reply_with_null(self.ctx),
Ok(ValkeyValue::NoReply) => raw::Status::Ok,
Ok(ValkeyValue::StaticError(s)) => self.reply_error_string(s),
Err(ValkeyError::WrongArity) => unsafe {
if self.is_keys_position_request() {
raw::Status::Err
} else {
raw::RedisModule_WrongArity.unwrap()(self.ctx).into()
}
},
Err(ValkeyError::WrongType) => {
self.reply_error_string(ValkeyError::WrongType.to_string().as_str())
}
Err(ValkeyError::String(s)) => self.reply_error_string(s.as_str()),
Err(ValkeyError::Str(s)) => self.reply_error_string(s),
}
}
#[must_use]
pub fn open_key(&self, key: &ValkeyString) -> ValkeyKey {
ValkeyKey::open(self.ctx, key)
}
#[must_use]
pub fn open_key_with_flags(&self, key: &ValkeyString, flags: KeyFlags) -> ValkeyKey {
ValkeyKey::open_with_flags(self.ctx, key, flags)
}
#[must_use]
pub fn open_key_writable(&self, key: &ValkeyString) -> ValkeyKeyWritable {
ValkeyKeyWritable::open(self.ctx, key)
}
#[must_use]
pub fn open_key_writable_with_flags(
&self,
key: &ValkeyString,
flags: KeyFlags,
) -> ValkeyKeyWritable {
ValkeyKeyWritable::open_with_flags(self.ctx, key, flags)
}
pub fn replicate_verbatim(&self) {
raw::replicate_verbatim(self.ctx);
}
pub fn replicate<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) {
raw::replicate(self.ctx, command, args);
}
#[must_use]
pub fn create_string<T: Into<Vec<u8>>>(&self, s: T) -> ValkeyString {
ValkeyString::create(NonNull::new(self.ctx), s)
}
#[must_use]
pub const fn get_raw(&self) -> *mut raw::RedisModuleCtx {
self.ctx
}
pub unsafe fn export_shared_api(
&self,
func: *const ::std::os::raw::c_void,
name: *const ::std::os::raw::c_char,
) {
raw::export_shared_api(self.ctx, func, name);
}
#[allow(clippy::must_use_candidate)]
pub fn notify_keyspace_event(
&self,
event_type: raw::NotifyEvent,
event: &str,
keyname: &ValkeyString,
) -> raw::Status {
unsafe { raw::notify_keyspace_event(self.ctx, event_type, event, keyname) }
}
pub fn current_command_name(&self) -> Result<String, ValkeyError> {
unsafe {
match raw::RedisModule_GetCurrentCommandName {
Some(cmd) => Ok(CStr::from_ptr(cmd(self.ctx)).to_str().unwrap().to_string()),
None => Err(ValkeyError::Str(
"API RedisModule_GetCurrentCommandName is not available",
)),
}
}
}
pub fn get_redis_version(&self) -> Result<Version, ValkeyError> {
self.get_redis_version_internal(false)
}
pub fn get_redis_version_rm_call(&self) -> Result<Version, ValkeyError> {
self.get_redis_version_internal(true)
}
pub fn version_from_info(info: ValkeyValue) -> Result<Version, ValkeyError> {
if let ValkeyValue::SimpleString(info_str) = info {
if let Some(ver) = utils::get_regexp_captures(
info_str.as_str(),
r"(?m)\bredis_version:([0-9]+)\.([0-9]+)\.([0-9]+)\b",
) {
return Ok(Version {
major: ver[0].parse::<c_int>().unwrap(),
minor: ver[1].parse::<c_int>().unwrap(),
patch: ver[2].parse::<c_int>().unwrap(),
});
}
}
Err(ValkeyError::Str("Error getting redis_version"))
}
#[allow(clippy::not_unsafe_ptr_arg_deref)]
fn get_redis_version_internal(&self, force_use_rm_call: bool) -> Result<Version, ValkeyError> {
match unsafe { raw::RedisModule_GetServerVersion } {
Some(api) if !force_use_rm_call => {
Ok(Version::from(unsafe { api() }))
}
_ => {
if let Ok(info) = self.call("info", &["server"]) {
Self::version_from_info(info)
} else {
Err(ValkeyError::Str("Error calling \"info server\""))
}
}
}
}
pub fn set_module_options(&self, options: ModuleOptions) {
unsafe { raw::RedisModule_SetModuleOptions.unwrap()(self.ctx, options.bits()) };
}
pub fn get_flags(&self) -> ContextFlags {
ContextFlags::from_bits_truncate(unsafe {
raw::RedisModule_GetContextFlags.unwrap()(self.ctx)
})
}
pub fn get_current_user(&self) -> ValkeyString {
let user = unsafe { raw::RedisModule_GetCurrentUserName.unwrap()(self.ctx) };
ValkeyString::from_redis_module_string(ptr::null_mut(), user)
}
pub fn authenticate_user(
&self,
user_name: &ValkeyString,
) -> Result<ContextUserScope<'_>, ValkeyError> {
let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
if user.is_null() {
return Err(ValkeyError::Str("User does not exists or disabled"));
}
unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, user) };
Ok(ContextUserScope::new(self, user))
}
fn deautenticate_user(&self) {
unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, ptr::null_mut()) };
}
pub fn acl_check_key_permission(
&self,
user_name: &ValkeyString,
key_name: &ValkeyString,
permissions: &AclPermissions,
) -> Result<(), ValkeyError> {
let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
if user.is_null() {
return Err(ValkeyError::Str("User does not exists or disabled"));
}
let acl_permission_result: raw::Status = unsafe {
raw::RedisModule_ACLCheckKeyPermissions.unwrap()(
user,
key_name.inner,
permissions.bits(),
)
}
.into();
unsafe { raw::RedisModule_FreeModuleUser.unwrap()(user) };
let acl_permission_result: Result<(), &str> = acl_permission_result.into();
acl_permission_result
.map_err(|_e| ValkeyError::Str("User does not have permissions on key"))
}
api!(
[RedisModule_AddPostNotificationJob],
pub fn add_post_notification_job<F: FnOnce(&Context) + 'static>(
&self,
callback: F,
) -> Status {
let callback = Box::into_raw(Box::new(Some(callback)));
unsafe {
RedisModule_AddPostNotificationJob(
self.ctx,
Some(post_notification_job::<F>),
callback as *mut c_void,
Some(post_notification_job_free_callback::<F>),
)
}
.into()
}
);
api!(
[RedisModule_AvoidReplicaTraffic],
pub fn avoid_replication_traffic(&self) -> bool {
unsafe { RedisModule_AvoidReplicaTraffic() == 1 }
}
);
fn is_enterprise_internal(&self) -> Result<bool, ValkeyError> {
let info_res = self.call("info", &["server"])?;
let info = match &info_res {
ValkeyValue::BulkValkeyString(res) => res.try_as_str()?,
ValkeyValue::SimpleString(res) => res.as_str(),
_ => return Err(ValkeyError::Str("Mismatch call reply type")),
};
Ok(info.contains("rlec_version:"))
}
pub fn is_enterprise(&self) -> bool {
self.is_enterprise_internal().unwrap_or_else(|e| {
log::error!("Failed getting deployment type, assuming oss. Error: {e}.");
false
})
}
}
extern "C" fn post_notification_job_free_callback<F: FnOnce(&Context)>(pd: *mut c_void) {
unsafe {
drop(Box::from_raw(pd as *mut Option<F>));
};
}
extern "C" fn post_notification_job<F: FnOnce(&Context)>(
ctx: *mut raw::RedisModuleCtx,
pd: *mut c_void,
) {
let callback = unsafe { &mut *(pd as *mut Option<F>) };
let ctx = Context::new(ctx);
callback.take().map_or_else(
|| {
ctx.log(
ValkeyLogLevel::Warning,
"Got a None callback on post notification job.",
)
},
|callback| {
callback(&ctx);
},
);
}
unsafe impl ValkeyLockIndicator for Context {}
bitflags! {
#[derive(Debug)]
pub struct AclPermissions : c_int {
const ACCESS = raw::REDISMODULE_CMD_KEY_ACCESS as c_int;
const INSERT = raw::REDISMODULE_CMD_KEY_INSERT as c_int;
const DELETE = raw::REDISMODULE_CMD_KEY_DELETE as c_int;
const UPDATE = raw::REDISMODULE_CMD_KEY_UPDATE as c_int;
}
}
#[derive(Debug, Clone)]
pub enum InfoContextBuilderFieldBottomLevelValue {
String(String),
I64(i64),
U64(u64),
F64(f64),
}
impl From<String> for InfoContextBuilderFieldBottomLevelValue {
fn from(value: String) -> Self {
Self::String(value)
}
}
impl From<&str> for InfoContextBuilderFieldBottomLevelValue {
fn from(value: &str) -> Self {
Self::String(value.to_owned())
}
}
impl From<i64> for InfoContextBuilderFieldBottomLevelValue {
fn from(value: i64) -> Self {
Self::I64(value)
}
}
impl From<u64> for InfoContextBuilderFieldBottomLevelValue {
fn from(value: u64) -> Self {
Self::U64(value)
}
}
#[derive(Debug, Clone)]
pub enum InfoContextBuilderFieldTopLevelValue {
Value(InfoContextBuilderFieldBottomLevelValue),
Dictionary {
name: String,
fields: InfoContextFieldBottomLevelData,
},
}
impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<T>
for InfoContextBuilderFieldTopLevelValue
{
fn from(value: T) -> Self {
Self::Value(value.into())
}
}
#[derive(Debug)]
pub struct InfoContextBuilderDictionaryBuilder<'a> {
info_section_builder: InfoContextBuilderSectionBuilder<'a>,
name: String,
fields: InfoContextFieldBottomLevelData,
}
impl<'a> InfoContextBuilderDictionaryBuilder<'a> {
pub fn field<F: Into<InfoContextBuilderFieldBottomLevelValue>>(
mut self,
name: &str,
value: F,
) -> ValkeyResult<Self> {
if self.fields.iter().any(|k| k.0 .0 == name) {
return Err(ValkeyError::String(format!(
"Found duplicate key '{name}' in the info dictionary '{}'",
self.name
)));
}
self.fields.push((name.to_owned(), value.into()).into());
Ok(self)
}
pub fn build_dictionary(self) -> ValkeyResult<InfoContextBuilderSectionBuilder<'a>> {
let name = self.name;
let name_ref = name.clone();
self.info_section_builder.field(
&name_ref,
InfoContextBuilderFieldTopLevelValue::Dictionary {
name,
fields: self.fields.to_owned(),
},
)
}
}
#[derive(Debug)]
pub struct InfoContextBuilderSectionBuilder<'a> {
info_builder: InfoContextBuilder<'a>,
name: String,
fields: InfoContextFieldTopLevelData,
}
impl<'a> InfoContextBuilderSectionBuilder<'a> {
pub fn field<F: Into<InfoContextBuilderFieldTopLevelValue>>(
mut self,
name: &str,
value: F,
) -> ValkeyResult<Self> {
if self.fields.iter().any(|(k, _)| k == name) {
return Err(ValkeyError::String(format!(
"Found duplicate key '{name}' in the info section '{}'",
self.name
)));
}
self.fields.push((name.to_owned(), value.into()));
Ok(self)
}
pub fn add_dictionary(self, dictionary_name: &str) -> InfoContextBuilderDictionaryBuilder<'a> {
InfoContextBuilderDictionaryBuilder {
info_section_builder: self,
name: dictionary_name.to_owned(),
fields: InfoContextFieldBottomLevelData::default(),
}
}
pub fn build_section(mut self) -> ValkeyResult<InfoContextBuilder<'a>> {
if self
.info_builder
.sections
.iter()
.any(|(k, _)| k == &self.name)
{
return Err(ValkeyError::String(format!(
"Found duplicate section in the Info reply: {}",
self.name
)));
}
self.info_builder
.sections
.push((self.name.clone(), self.fields));
Ok(self.info_builder)
}
}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct InfoContextBottomLevelFieldData(pub (String, InfoContextBuilderFieldBottomLevelValue));
impl Deref for InfoContextBottomLevelFieldData {
type Target = (String, InfoContextBuilderFieldBottomLevelValue);
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for InfoContextBottomLevelFieldData {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<(String, T)>
for InfoContextBottomLevelFieldData
{
fn from(value: (String, T)) -> Self {
Self((value.0, value.1.into()))
}
}
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct InfoContextFieldBottomLevelData(pub Vec<InfoContextBottomLevelFieldData>);
impl Deref for InfoContextFieldBottomLevelData {
type Target = Vec<InfoContextBottomLevelFieldData>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for InfoContextFieldBottomLevelData {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub type InfoContextFieldTopLevelData = Vec<(String, InfoContextBuilderFieldTopLevelValue)>;
pub type OneInfoSectionData = (String, InfoContextFieldTopLevelData);
pub type InfoContextTreeData = Vec<OneInfoSectionData>;
impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<BTreeMap<String, T>>
for InfoContextFieldBottomLevelData
{
fn from(value: BTreeMap<String, T>) -> Self {
Self(
value
.into_iter()
.map(|e| (e.0, e.1.into()).into())
.collect(),
)
}
}
impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<HashMap<String, T>>
for InfoContextFieldBottomLevelData
{
fn from(value: HashMap<String, T>) -> Self {
Self(
value
.into_iter()
.map(|e| (e.0, e.1.into()).into())
.collect(),
)
}
}
#[derive(Debug)]
pub struct InfoContextBuilder<'a> {
context: &'a InfoContext,
sections: InfoContextTreeData,
}
impl<'a> InfoContextBuilder<'a> {
fn add_bottom_level_field(
&self,
key: &str,
value: &InfoContextBuilderFieldBottomLevelValue,
) -> ValkeyResult<()> {
use InfoContextBuilderFieldBottomLevelValue as BottomLevel;
match value {
BottomLevel::String(string) => add_info_field_str(self.context.ctx, key, string),
BottomLevel::I64(number) => add_info_field_long_long(self.context.ctx, key, *number),
BottomLevel::U64(number) => {
add_info_field_unsigned_long_long(self.context.ctx, key, *number)
}
BottomLevel::F64(number) => add_info_field_double(self.context.ctx, key, *number),
}
.into()
}
fn add_top_level_fields(&self, fields: &InfoContextFieldTopLevelData) -> ValkeyResult<()> {
use InfoContextBuilderFieldTopLevelValue as TopLevel;
fields.iter().try_for_each(|(key, value)| match value {
TopLevel::Value(bottom_level) => self.add_bottom_level_field(key, bottom_level),
TopLevel::Dictionary { name, fields } => {
std::convert::Into::<ValkeyResult<()>>::into(add_info_begin_dict_field(
self.context.ctx,
name,
))?;
fields
.iter()
.try_for_each(|f| self.add_bottom_level_field(&f.0 .0, &f.0 .1))?;
add_info_end_dict_field(self.context.ctx).into()
}
})
}
fn finalise_data(&self) -> ValkeyResult<()> {
self.sections
.iter()
.try_for_each(|(section_name, section_fields)| -> ValkeyResult<()> {
if add_info_section(self.context.ctx, Some(section_name)) == Status::Ok {
self.add_top_level_fields(section_fields)
} else {
Ok(())
}
})
}
pub fn build_info(self) -> ValkeyResult<&'a InfoContext> {
self.finalise_data().map(|_| self.context)
}
pub fn add_section(self, name: &'a str) -> InfoContextBuilderSectionBuilder {
InfoContextBuilderSectionBuilder {
info_builder: self,
name: name.to_owned(),
fields: InfoContextFieldTopLevelData::new(),
}
}
pub(crate) fn add_section_unchecked(mut self, section: OneInfoSectionData) -> Self {
self.sections.push(section);
self
}
}
impl<'a> From<&'a InfoContext> for InfoContextBuilder<'a> {
fn from(context: &'a InfoContext) -> Self {
Self {
context,
sections: InfoContextTreeData::new(),
}
}
}
#[derive(Debug)]
pub struct InfoContext {
pub ctx: *mut raw::RedisModuleInfoCtx,
}
impl InfoContext {
pub const fn new(ctx: *mut raw::RedisModuleInfoCtx) -> Self {
Self { ctx }
}
pub fn builder(&self) -> InfoContextBuilder<'_> {
InfoContextBuilder::from(self)
}
pub fn build_one_section<T: Into<OneInfoSectionData>>(&self, data: T) -> ValkeyResult<()> {
self.builder()
.add_section_unchecked(data.into())
.build_info()?;
Ok(())
}
#[deprecated = "Please use [`InfoContext::builder`] instead."]
pub fn add_info_section(&self, name: Option<&str>) -> Status {
add_info_section(self.ctx, name)
}
#[deprecated = "Please use [`InfoContext::builder`] instead."]
pub fn add_info_field_str(&self, name: &str, content: &str) -> Status {
add_info_field_str(self.ctx, name, content)
}
#[deprecated = "Please use [`InfoContext::builder`] instead."]
pub fn add_info_field_long_long(&self, name: &str, value: c_longlong) -> Status {
add_info_field_long_long(self.ctx, name, value)
}
}
bitflags! {
pub struct ContextFlags : c_int {
const LUA = raw::REDISMODULE_CTX_FLAGS_LUA as c_int;
const MULTI = raw::REDISMODULE_CTX_FLAGS_MULTI as c_int;
const MASTER = raw::REDISMODULE_CTX_FLAGS_MASTER as c_int;
const SLAVE = raw::REDISMODULE_CTX_FLAGS_SLAVE as c_int;
const READONLY = raw::REDISMODULE_CTX_FLAGS_READONLY as c_int;
const CLUSTER = raw::REDISMODULE_CTX_FLAGS_CLUSTER as c_int;
const AOF = raw::REDISMODULE_CTX_FLAGS_AOF as c_int;
const RDB = raw::REDISMODULE_CTX_FLAGS_RDB as c_int;
const MAXMEMORY = raw::REDISMODULE_CTX_FLAGS_MAXMEMORY as c_int;
const EVICTED = raw::REDISMODULE_CTX_FLAGS_EVICT as c_int;
const OOM = raw::REDISMODULE_CTX_FLAGS_OOM as c_int;
const OOM_WARNING = raw::REDISMODULE_CTX_FLAGS_OOM_WARNING as c_int;
const REPLICATED = raw::REDISMODULE_CTX_FLAGS_REPLICATED as c_int;
const LOADING = raw::REDISMODULE_CTX_FLAGS_LOADING as c_int;
const REPLICA_IS_STALE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE as c_int;
const REPLICA_IS_CONNECTING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING as c_int;
const REPLICA_IS_TRANSFERRING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING as c_int;
const REPLICA_IS_ONLINE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE as c_int;
const ACTIVE_CHILD = raw::REDISMODULE_CTX_FLAGS_ACTIVE_CHILD as c_int;
const IS_CHILD = raw::REDISMODULE_CTX_FLAGS_IS_CHILD as c_int;
const MULTI_DIRTY = raw::REDISMODULE_CTX_FLAGS_MULTI_DIRTY as c_int;
const DENY_BLOCKING = raw::REDISMODULE_CTX_FLAGS_DENY_BLOCKING as c_int;
const FLAGS_RESP3 = raw::REDISMODULE_CTX_FLAGS_RESP3 as c_int;
const ASYNC_LOADING = raw::REDISMODULE_CTX_FLAGS_ASYNC_LOADING as c_int;
}
}