Skip to main content

redis_module/context/
defrag.rs

1use std::alloc::Layout;
2use std::fmt::{Debug, Formatter};
3use std::ptr::NonNull;
4
5use crate::{
6    raw, Context, RedisModule_DefragAlloc, RedisModule_DefragCursorGet,
7    RedisModule_DefragCursorSet, RedisModule_DefragRedisModuleString, RedisModule_DefragShouldStop,
8    RedisString, Status,
9};
10use crate::{RedisError, RedisLockIndicator};
11use linkme::distributed_slice;
12
13pub struct DefragContext {
14    defrag_ctx: NonNull<raw::RedisModuleDefragCtx>,
15}
16
17impl Debug for DefragContext {
18    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19        let mut debug_struct = f.debug_struct("DefragContext");
20        let debug_struct = debug_struct.field("defrag_ctx", &self.defrag_ctx);
21        debug_struct.finish()
22    }
23}
24
25/// Having a [DefragContext] is indication that we are
26/// currently holding the Redis GIL, this is why it is safe to
27/// implement a [RedisLockIndicator] for [DefragContext].
28unsafe impl RedisLockIndicator for DefragContext {}
29
30impl DefragContext {
31    /// Creates a new [`DefragContext`] from a poiter to [`raw::RedisModuleDefragCtx`].
32    /// The function is exposed for users that wants to implement the defrag function
33    /// on their module datatype, they can use this function to create [`DefragContext`]
34    /// that can be used in a safely manner.
35    ///
36    /// # Safety
37    ///
38    /// The function is considered unsafe because the provided pointer
39    /// must be a valid pointer to [`raw::RedisModuleDefragCtx`], and the Redis GIL must be held.
40    /// Notice that the returned [`DefragContext`] borrows the pointer to [`raw::RedisModuleDefragCtx`]
41    /// so it can not outlive it (this means that it should not be used once the defrag callback ends).
42    pub unsafe fn new(defrag_ctx: *mut raw::RedisModuleDefragCtx) -> DefragContext {
43        DefragContext {
44            defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
45        }
46    }
47
48    /// When the data type defrag callback iterates complex structures, this
49    /// function should be called periodically. A [`false`] return
50    /// indicates the callback may continue its work. A [`true`]
51    /// indicates it should stop.
52    ///
53    /// When stopped, the callback may use [`Self::set_cursor`] to store its
54    /// position so it can later use [`Self::get_cursor`] to resume defragging.
55    ///
56    /// When stopped and more work is left to be done, the callback should
57    /// return 1. Otherwise, it should return 0.
58    ///
59    /// NOTE: Modules should consider the frequency in which this function is called,
60    /// so it generally makes sense to do small batches of work in between calls.
61    pub fn should_stop(&self) -> bool {
62        let should_stop = unsafe {
63            RedisModule_DefragShouldStop.expect("RedisModule_DefragShouldStop should be available.")(
64                self.defrag_ctx.as_ptr(),
65            )
66        };
67        should_stop != 0
68    }
69
70    /// Store an arbitrary cursor value for future re-use.
71    ///
72    /// This should only be called if [`Self::should_stop`] has returned a non-zero
73    /// value and the defrag callback is about to exit without fully iterating its
74    /// data type.
75    ///
76    /// This behavior is reserved to cases where late defrag is performed. Late
77    /// defrag is selected for keys that implement the `free_effort` callback and
78    /// return a `free_effort` value that is larger than the defrag
79    /// 'active-defrag-max-scan-fields' configuration directive.
80    ///
81    /// Smaller keys, keys that do not implement `free_effort` or the global
82    /// defrag callback are not called in late-defrag mode. In those cases, a
83    /// call to this function will return [`Status::Err`].
84    ///
85    /// The cursor may be used by the module to represent some progress into the
86    /// module's data type. Modules may also store additional cursor-related
87    /// information locally and use the cursor as a flag that indicates when
88    /// traversal of a new key begins. This is possible because the API makes
89    /// a guarantee that concurrent defragmentation of multiple keys will
90    /// not be performed.
91    pub fn set_cursor(&self, cursor: u64) -> Status {
92        unsafe {
93            RedisModule_DefragCursorSet.expect("RedisModule_DefragCursorSet should be available.")(
94                self.defrag_ctx.as_ptr(),
95                cursor,
96            )
97        }
98        .into()
99    }
100
101    /// Fetch a cursor value that has been previously stored using [`Self::set_cursor`].
102    /// If not called for a late defrag operation, [`Err`] will be returned.
103    pub fn get_cursor(&self) -> Result<u64, RedisError> {
104        let mut cursor: u64 = 0;
105        let res: Status = unsafe {
106            RedisModule_DefragCursorGet.expect("RedisModule_DefragCursorGet should be available.")(
107                self.defrag_ctx.as_ptr(),
108                (&mut cursor) as *mut u64,
109            )
110        }
111        .into();
112        if res == Status::Ok {
113            Ok(cursor)
114        } else {
115            Err(RedisError::Str("Could not get cursor value"))
116        }
117    }
118
119    /// Defrag a memory allocation previously allocated by RM_Alloc, RM_Calloc, etc.
120    /// The defragmentation process involves allocating a new memory block and copying
121    /// the contents to it, like realloc().
122    ///
123    /// If defragmentation was not necessary, NULL is returned and the operation has
124    /// no other effect.
125    ///
126    /// # Safety
127    ///
128    /// The function is unsafe because it is assumed that the pointer is valid and previusly
129    /// allocated. It is considered undefined if this is not the case.
130    ///
131    /// If a non-NULL value is returned, the caller should use the new pointer instead
132    /// of the old one and update any reference to the old pointer, which must not
133    /// be used again.
134    pub unsafe fn defrag_realloc<T>(&self, mut ptr: *mut T) -> *mut T {
135        let new_ptr: *mut T = RedisModule_DefragAlloc
136            .expect("RedisModule_DefragAlloc should be available.")(
137            self.defrag_ctx.as_ptr(),
138            ptr.cast(),
139        )
140        .cast();
141        if !new_ptr.is_null() {
142            ptr = new_ptr;
143        }
144        ptr
145    }
146
147    /// Allocate memory using defrag allocator if supported by the
148    /// current Redis server, fallback to regular allocation otherwise.
149    pub fn defrag_alloc<T>(&self, layout: Layout) -> *mut T {
150        unsafe { std::alloc::alloc(layout) }.cast()
151    }
152
153    /// Deallocate memory using defrag deallocator if supported by the
154    /// current Redis server, fallback to regular deallocation otherwise.
155    pub fn defrag_dealloc<T>(&self, ptr: *mut T, layout: Layout) {
156        unsafe { std::alloc::dealloc(ptr.cast(), layout) }
157    }
158
159    /// Defrag a [RedisString]
160    ///
161    /// NOTE: It is only possible to defrag strings that have a single reference.
162    /// Typically this means strings that was copy/cloned using [RedisString::safe_clone]
163    /// or created using [RedisString::new] will not be defrag and will be returned as is.
164    pub fn defrag_redis_string(&self, mut s: RedisString) -> RedisString {
165        let new_inner = unsafe {
166            RedisModule_DefragRedisModuleString
167                .expect("RedisModule_DefragRedisModuleString should be available.")(
168                self.defrag_ctx.as_ptr(),
169                s.inner,
170            )
171        };
172        if !new_inner.is_null() {
173            s.inner = new_inner;
174        }
175        s
176    }
177}
178
179#[distributed_slice()]
180pub static DEFRAG_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
181
182#[distributed_slice()]
183pub static DEFRAG_START_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
184
185#[distributed_slice()]
186pub static DEFRAG_END_FUNCTIONS_LIST: [fn(&DefragContext)] = [..];
187
188extern "C" fn defrag_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
189    let mut ctx = DefragContext {
190        defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
191    };
192    DEFRAG_FUNCTIONS_LIST.iter().for_each(|callback| {
193        callback(&mut ctx);
194    });
195}
196
197extern "C" fn defrag_start_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
198    let mut ctx = DefragContext {
199        defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
200    };
201    DEFRAG_START_FUNCTIONS_LIST.iter().for_each(|callback| {
202        callback(&mut ctx);
203    });
204}
205
206extern "C" fn defrag_end_function(defrag_ctx: *mut raw::RedisModuleDefragCtx) {
207    let mut ctx = DefragContext {
208        defrag_ctx: NonNull::new(defrag_ctx).expect("defrag_ctx is expected to be no NULL"),
209    };
210    DEFRAG_END_FUNCTIONS_LIST.iter().for_each(|callback| {
211        callback(&mut ctx);
212    });
213}
214
215/// Register defrag functions if exists.
216pub fn register_defrag_functions(ctx: &Context) -> Result<(), RedisError> {
217    let register_defrag_function = match unsafe { raw::RedisModule_RegisterDefragFunc } {
218        Some(f) => f,
219        None => {
220            ctx.log_warning("Skip register defrag function as defrag is not supported on the current Redis server.");
221            return Ok(());
222        }
223    };
224    if !DEFRAG_FUNCTIONS_LIST.is_empty() {
225        let res = unsafe { register_defrag_function(ctx.ctx, Some(defrag_function)) };
226        if res != raw::REDISMODULE_OK as i32 {
227            return Err(RedisError::Str("Failed register defrag function"));
228        }
229    }
230
231    let register_defrag_callbacks = match unsafe { raw::RedisModule_RegisterDefragCallbacks } {
232        Some(f) => f,
233        None => {
234            ctx.log_warning("Skip register defrag callbacks as defrag callbacks is not supported on the current Redis server.");
235            return Ok(());
236        }
237    };
238    if !DEFRAG_START_FUNCTIONS_LIST.is_empty() || !DEFRAG_END_FUNCTIONS_LIST.is_empty() {
239        let res = unsafe {
240            register_defrag_callbacks(
241                ctx.ctx,
242                Some(defrag_start_function),
243                Some(defrag_end_function),
244            )
245        };
246        if res != raw::REDISMODULE_OK as i32 {
247            return Err(RedisError::Str("Failed register defrag callbacks"));
248        }
249    }
250
251    Ok(())
252}