1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
use std::alloc::Layout;
use std::fmt::{Debug, Formatter};
use std::ptr::NonNull;
use crate::{
raw, Context, RedisModule_DefragAlloc, RedisModule_DefragCursorGet,
RedisModule_DefragCursorSet, RedisModule_DefragRedisModuleString, RedisModule_DefragShouldStop,
RedisString, Status,
};
use crate::{RedisError, RedisLockIndicator};
use linkme::distributed_slice;
pub struct DefragContext {
defrag_ctx: NonNull<raw::RedisModuleDefragCtx>,
}
impl Debug for DefragContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut debug_struct = f.debug_struct("DefragContext");
let debug_struct = debug_struct.field("defrag_ctx", &self.defrag_ctx);
debug_struct.finish()
}
}
/// Having a [DefragContext] is indication that we are
/// currently holding the Redis GIL, this is why it is safe to
/// implement a [RedisLockIndicator] for [DefragContext].
unsafe impl RedisLockIndicator for DefragContext {}
impl DefragContext {
/// Creates a new [`DefragContext`] from a poiter to [`raw::RedisModuleDefragCtx`].
/// The function is exposed for users that wants to implement the defrag function
/// on their module datatype, they can use this function to create [`DefragContext`]
/// that can be used in a safely manner.
///
/// # Safety
///
/// The function is considered unsafe because the provided pointer
/// must be a valid pointer to [`raw::RedisModuleDefragCtx`], and the Redis GIL must be held.
/// Notice that the returned [`DefragContext`] borrows the pointer to [`raw::RedisModuleDefragCtx`]
/// so it can not outlive it (this means that it should not be used once the defrag callback ends).
pub unsafe fn new(defrag_ctx: *mut raw::RedisModuleDefragCtx) -> DefragContext {
DefragContext {
defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
}
}
/// When the data type defrag callback iterates complex structures, this
/// function should be called periodically. A [`false`] return
/// indicates the callback may continue its work. A [`true`]
/// indicates it should stop.
///
/// When stopped, the callback may use [`Self::set_cursor`] to store its
/// position so it can later use [`Self::get_cursor`] to resume defragging.
///
/// When stopped and more work is left to be done, the callback should
/// return 1. Otherwise, it should return 0.
///
/// NOTE: Modules should consider the frequency in which this function is called,
/// so it generally makes sense to do small batches of work in between calls.
pub fn should_stop(&self) -> bool {
let should_stop = unsafe {
RedisModule_DefragShouldStop.expect("RedisModule_DefragShouldStop should be available.")(
self.defrag_ctx.as_ptr(),
)
};
should_stop != 0
}
/// Store an arbitrary cursor value for future re-use.
///
/// This should only be called if [`Self::should_stop`] has returned a non-zero
/// value and the defrag callback is about to exit without fully iterating its
/// data type.
///
/// This behavior is reserved to cases where late defrag is performed. Late
/// defrag is selected for keys that implement the `free_effort` callback and
/// return a `free_effort` value that is larger than the defrag
/// 'active-defrag-max-scan-fields' configuration directive.
///
/// Smaller keys, keys that do not implement `free_effort` or the global
/// defrag callback are not called in late-defrag mode. In those cases, a
/// call to this function will return [`Status::Err`].
///
/// The cursor may be used by the module to represent some progress into the
/// module's data type. Modules may also store additional cursor-related
/// information locally and use the cursor as a flag that indicates when
/// traversal of a new key begins. This is possible because the API makes
/// a guarantee that concurrent defragmentation of multiple keys will
/// not be performed.
pub fn set_cursor(&self, cursor: u64) -> Status {
unsafe {
RedisModule_DefragCursorSet.expect("RedisModule_DefragCursorSet should be available.")(
self.defrag_ctx.as_ptr(),
cursor,
)
}
.into()
}
/// Fetch a cursor value that has been previously stored using [`Self::set_cursor`].
/// If not called for a late defrag operation, [`Err`] will be returned.
pub fn get_cursor(&self) -> Result<u64, RedisError> {
let mut cursor: u64 = 0;
let res: Status = unsafe {
RedisModule_DefragCursorGet.expect("RedisModule_DefragCursorGet should be available.")(
self.defrag_ctx.as_ptr(),
(&mut cursor) as *mut u64,
)
}
.into();
if res == Status::Ok {
Ok(cursor)
} else {
Err(RedisError::Str("Could not get cursor value"))
}
}
/// Defrag a memory allocation previously allocated by RM_Alloc, RM_Calloc, etc.
/// The defragmentation process involves allocating a new memory block and copying
/// the contents to it, like realloc().
///
/// If defragmentation was not necessary, NULL is returned and the operation has
/// no other effect.
///
/// # Safety
///
/// The function is unsafe because it is assumed that the pointer is valid and previusly
/// allocated. It is considered undefined if this is not the case.
///
/// If a non-NULL value is returned, the caller should use the new pointer instead
/// of the old one and update any reference to the old pointer, which must not
/// be used again.
pub unsafe fn defrag_realloc<T>(&self, mut ptr: *mut T) -> *mut T {
let new_ptr: *mut T = RedisModule_DefragAlloc
.expect("RedisModule_DefragAlloc should be available.")(
self.defrag_ctx.as_ptr(),
ptr.cast(),
)
.cast();
if !new_ptr.is_null() {
ptr = new_ptr;
}
ptr
}
/// Allocate memory using defrag allocator if supported by the
/// current Redis server, fallback to regular allocation otherwise.
pub fn defrag_alloc<T>(&self, layout: Layout) -> *mut T {
unsafe { std::alloc::alloc(layout) }.cast()
}
/// Deallocate memory using defrag deallocator if supported by the
/// current Redis server, fallback to regular deallocation otherwise.
pub fn defrag_dealloc<T>(&self, ptr: *mut T, layout: Layout) {
unsafe { std::alloc::dealloc(ptr.cast(), layout) }
}
/// Defrag a [RedisString]
///
/// NOTE: It is only possible to defrag strings that have a single reference.
/// Typically this means strings that was copy/cloned using [RedisString::safe_clone]
/// or created using [RedisString::new] will not be defrag and will be returned as is.
pub fn defrag_redis_string(&self, mut s: RedisString) -> RedisString {
let new_inner = unsafe {
RedisModule_DefragRedisModuleString
.expect("RedisModule_DefragRedisModuleString should be available.")(
self.defrag_ctx.as_ptr(),
s.inner,
)
};
if !new_inner.is_null() {
s.inner = new_inner;
}
s
}
}
#[distributed_slice()]
pub static DEFRAG_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
#[distributed_slice()]
pub static DEFRAG_START_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
#[distributed_slice()]
pub static DEFRAG_END_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
extern "C" fn defrag_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
let mut ctx = DefragContext {
defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
};
DEFRAG_FUNCTIONS_LIST.iter().for_each(|callback| {
callback(&mut ctx);
});
}
extern "C" fn defrag_start_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
let mut ctx = DefragContext {
defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
};
DEFRAG_START_FUNCTIONS_LIST.iter().for_each(|callback| {
callback(&mut ctx);
});
}
extern "C" fn defrag_end_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
let mut ctx = DefragContext {
defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
};
DEFRAG_END_FUNCTIONS_LIST.iter().for_each(|callback| {
callback(&mut ctx);
});
}
/// Register defrag functions if exists.
pub fn register_defrag_functions(ctx: &Context) -> Result<(), RedisError> {
let register_defrag_function = match unsafe { raw::RedisModule_RegisterDefragFunc } {
Some(f) => f,
None => {
ctx.log_warning("Skip register defrag function as defrag is not supported on the current Redis server.");
return Ok(());
}
};
if !DEFRAG_FUNCTIONS_LIST.is_empty() {
let res = unsafe { register_defrag_function(ctx.ctx, Some(defrag_function)) };
if res != raw::REDISMODULE_OK as i32 {
return Err(RedisError::Str("Failed register defrag function"));
}
}
let register_defrag_callbacks = match unsafe { raw::RedisModule_RegisterDefragCallbacks } {
Some(f) => f,
None => {
ctx.log_warning("Skip register defrag callbacks as defrag callbacks is not supported on the current Redis server.");
return Ok(());
}
};
if !DEFRAG_START_FUNCTIONS_LIST.is_empty() || !DEFRAG_END_FUNCTIONS_LIST.is_empty() {
let res = unsafe {
register_defrag_callbacks(
ctx.ctx,
Some(defrag_start_function),
Some(defrag_end_function),
)
};
if res != raw::REDISMODULE_OK as i32 {
return Err(RedisError::Str("Failed register defrag callbacks"));
}
}
Ok(())
}