redis_module/context/defrag.rs
1use std::alloc::Layout;
2use std::fmt::{Debug, Formatter};
3use std::ptr::NonNull;
4
5use crate::{
6 raw, Context, RedisModule_DefragAlloc, RedisModule_DefragCursorGet,
7 RedisModule_DefragCursorSet, RedisModule_DefragRedisModuleString, RedisModule_DefragShouldStop,
8 RedisString, Status,
9};
10use crate::{RedisError, RedisLockIndicator};
11use linkme::distributed_slice;
12
13pub struct DefragContext {
14 defrag_ctx: NonNull<raw::RedisModuleDefragCtx>,
15}
16
17impl Debug for DefragContext {
18 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19 let mut debug_struct = f.debug_struct("DefragContext");
20 let debug_struct = debug_struct.field("defrag_ctx", &self.defrag_ctx);
21 debug_struct.finish()
22 }
23}
24
25/// Having a [DefragContext] is indication that we are
26/// currently holding the Redis GIL, this is why it is safe to
27/// implement a [RedisLockIndicator] for [DefragContext].
28unsafe impl RedisLockIndicator for DefragContext {}
29
30impl DefragContext {
31 /// Creates a new [`DefragContext`] from a poiter to [`raw::RedisModuleDefragCtx`].
32 /// The function is exposed for users that wants to implement the defrag function
33 /// on their module datatype, they can use this function to create [`DefragContext`]
34 /// that can be used in a safely manner.
35 ///
36 /// # Safety
37 ///
38 /// The function is considered unsafe because the provided pointer
39 /// must be a valid pointer to [`raw::RedisModuleDefragCtx`], and the Redis GIL must be held.
40 /// Notice that the returned [`DefragContext`] borrows the pointer to [`raw::RedisModuleDefragCtx`]
41 /// so it can not outlive it (this means that it should not be used once the defrag callback ends).
42 pub unsafe fn new(defrag_ctx: *mut raw::RedisModuleDefragCtx) -> DefragContext {
43 DefragContext {
44 defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
45 }
46 }
47
48 /// When the data type defrag callback iterates complex structures, this
49 /// function should be called periodically. A [`false`] return
50 /// indicates the callback may continue its work. A [`true`]
51 /// indicates it should stop.
52 ///
53 /// When stopped, the callback may use [`Self::set_cursor`] to store its
54 /// position so it can later use [`Self::get_cursor`] to resume defragging.
55 ///
56 /// When stopped and more work is left to be done, the callback should
57 /// return 1. Otherwise, it should return 0.
58 ///
59 /// NOTE: Modules should consider the frequency in which this function is called,
60 /// so it generally makes sense to do small batches of work in between calls.
61 pub fn should_stop(&self) -> bool {
62 let should_stop = unsafe {
63 RedisModule_DefragShouldStop.expect("RedisModule_DefragShouldStop should be available.")(
64 self.defrag_ctx.as_ptr(),
65 )
66 };
67 should_stop != 0
68 }
69
70 /// Store an arbitrary cursor value for future re-use.
71 ///
72 /// This should only be called if [`Self::should_stop`] has returned a non-zero
73 /// value and the defrag callback is about to exit without fully iterating its
74 /// data type.
75 ///
76 /// This behavior is reserved to cases where late defrag is performed. Late
77 /// defrag is selected for keys that implement the `free_effort` callback and
78 /// return a `free_effort` value that is larger than the defrag
79 /// 'active-defrag-max-scan-fields' configuration directive.
80 ///
81 /// Smaller keys, keys that do not implement `free_effort` or the global
82 /// defrag callback are not called in late-defrag mode. In those cases, a
83 /// call to this function will return [`Status::Err`].
84 ///
85 /// The cursor may be used by the module to represent some progress into the
86 /// module's data type. Modules may also store additional cursor-related
87 /// information locally and use the cursor as a flag that indicates when
88 /// traversal of a new key begins. This is possible because the API makes
89 /// a guarantee that concurrent defragmentation of multiple keys will
90 /// not be performed.
91 pub fn set_cursor(&self, cursor: u64) -> Status {
92 unsafe {
93 RedisModule_DefragCursorSet.expect("RedisModule_DefragCursorSet should be available.")(
94 self.defrag_ctx.as_ptr(),
95 cursor,
96 )
97 }
98 .into()
99 }
100
101 /// Fetch a cursor value that has been previously stored using [`Self::set_cursor`].
102 /// If not called for a late defrag operation, [`Err`] will be returned.
103 pub fn get_cursor(&self) -> Result<u64, RedisError> {
104 let mut cursor: u64 = 0;
105 let res: Status = unsafe {
106 RedisModule_DefragCursorGet.expect("RedisModule_DefragCursorGet should be available.")(
107 self.defrag_ctx.as_ptr(),
108 (&mut cursor) as *mut u64,
109 )
110 }
111 .into();
112 if res == Status::Ok {
113 Ok(cursor)
114 } else {
115 Err(RedisError::Str("Could not get cursor value"))
116 }
117 }
118
119 /// Defrag a memory allocation previously allocated by RM_Alloc, RM_Calloc, etc.
120 /// The defragmentation process involves allocating a new memory block and copying
121 /// the contents to it, like realloc().
122 ///
123 /// If defragmentation was not necessary, NULL is returned and the operation has
124 /// no other effect.
125 ///
126 /// # Safety
127 ///
128 /// The function is unsafe because it is assumed that the pointer is valid and previusly
129 /// allocated. It is considered undefined if this is not the case.
130 ///
131 /// If a non-NULL value is returned, the caller should use the new pointer instead
132 /// of the old one and update any reference to the old pointer, which must not
133 /// be used again.
134 pub unsafe fn defrag_realloc<T>(&self, mut ptr: *mut T) -> *mut T {
135 let new_ptr: *mut T = RedisModule_DefragAlloc
136 .expect("RedisModule_DefragAlloc should be available.")(
137 self.defrag_ctx.as_ptr(),
138 ptr.cast(),
139 )
140 .cast();
141 if !new_ptr.is_null() {
142 ptr = new_ptr;
143 }
144 ptr
145 }
146
147 /// Allocate memory using defrag allocator if supported by the
148 /// current Redis server, fallback to regular allocation otherwise.
149 pub fn defrag_alloc<T>(&self, layout: Layout) -> *mut T {
150 unsafe { std::alloc::alloc(layout) }.cast()
151 }
152
153 /// Deallocate memory using defrag deallocator if supported by the
154 /// current Redis server, fallback to regular deallocation otherwise.
155 pub fn defrag_dealloc<T>(&self, ptr: *mut T, layout: Layout) {
156 unsafe { std::alloc::dealloc(ptr.cast(), layout) }
157 }
158
159 /// Defrag a [RedisString]
160 ///
161 /// NOTE: It is only possible to defrag strings that have a single reference.
162 /// Typically this means strings that was copy/cloned using [RedisString::safe_clone]
163 /// or created using [RedisString::new] will not be defrag and will be returned as is.
164 pub fn defrag_redis_string(&self, mut s: RedisString) -> RedisString {
165 let new_inner = unsafe {
166 RedisModule_DefragRedisModuleString
167 .expect("RedisModule_DefragRedisModuleString should be available.")(
168 self.defrag_ctx.as_ptr(),
169 s.inner,
170 )
171 };
172 if !new_inner.is_null() {
173 s.inner = new_inner;
174 }
175 s
176 }
177}
178
179#[distributed_slice()]
180pub static DEFRAG_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
181
182#[distributed_slice()]
183pub static DEFRAG_START_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
184
185#[distributed_slice()]
186pub static DEFRAG_END_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
187
188extern "C" fn defrag_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
189 let mut ctx = DefragContext {
190 defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
191 };
192 DEFRAG_FUNCTIONS_LIST.iter().for_each(|callback| {
193 callback(&mut ctx);
194 });
195}
196
197extern "C" fn defrag_start_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
198 let mut ctx = DefragContext {
199 defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
200 };
201 DEFRAG_START_FUNCTIONS_LIST.iter().for_each(|callback| {
202 callback(&mut ctx);
203 });
204}
205
206extern "C" fn defrag_end_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
207 let mut ctx = DefragContext {
208 defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
209 };
210 DEFRAG_END_FUNCTIONS_LIST.iter().for_each(|callback| {
211 callback(&mut ctx);
212 });
213}
214
215/// Register defrag functions if exists.
216pub fn register_defrag_functions(ctx: &Context) -> Result<(), RedisError> {
217 let register_defrag_function = match unsafe { raw::RedisModule_RegisterDefragFunc } {
218 Some(f) => f,
219 None => {
220 ctx.log_warning("Skip register defrag function as defrag is not supported on the current Redis server.");
221 return Ok(());
222 }
223 };
224 if !DEFRAG_FUNCTIONS_LIST.is_empty() {
225 let res = unsafe { register_defrag_function(ctx.ctx, Some(defrag_function)) };
226 if res != raw::REDISMODULE_OK as i32 {
227 return Err(RedisError::Str("Failed register defrag function"));
228 }
229 }
230
231 let register_defrag_callbacks = match unsafe { raw::RedisModule_RegisterDefragCallbacks } {
232 Some(f) => f,
233 None => {
234 ctx.log_warning("Skip register defrag callbacks as defrag callbacks is not supported on the current Redis server.");
235 return Ok(());
236 }
237 };
238 if !DEFRAG_START_FUNCTIONS_LIST.is_empty() || !DEFRAG_END_FUNCTIONS_LIST.is_empty() {
239 let res = unsafe {
240 register_defrag_callbacks(
241 ctx.ctx,
242 Some(defrag_start_function),
243 Some(defrag_end_function),
244 )
245 };
246 if res != raw::REDISMODULE_OK as i32 {
247 return Err(RedisError::Str("Failed register defrag callbacks"));
248 }
249 }
250
251 Ok(())
252}