valkey_module/context/
server_events.rs

1use std::ffi::CStr;
2
3use crate::{context::Context, ValkeyError};
4use crate::{raw, InfoContext, ValkeyResult};
5use linkme::distributed_slice;
6
7#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
8pub enum ServerRole {
9    Primary,
10    Replica,
11}
12
13#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
14pub enum LoadingSubevent {
15    RdbStarted,
16    AofStarted,
17    ReplStarted,
18    Ended,
19    Failed,
20}
21
22#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
23pub enum FlushSubevent {
24    Started,
25    Ended,
26}
27
28#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
29pub enum ModuleChangeSubevent {
30    Loaded,
31    Unloaded,
32}
33
34#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
35pub enum ClientChangeSubevent {
36    Connected,
37    Disconnected,
38}
39
40#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
41pub enum KeyChangeSubevent {
42    Deleted,
43    Expired,
44    Evicted,
45    Overwritten,
46}
47
48#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
49pub enum PersistenceSubevent {
50    RdbStart,
51    AofStart,
52    SyncRdbStart,
53    SyncAofStart,
54    Ended,
55    Failed,
56}
57
58#[derive(Clone)]
59pub enum ServerEventHandler {
60    RoleChanged(fn(&Context, ServerRole)),
61    Loading(fn(&Context, LoadingSubevent)),
62    Flush(fn(&Context, FlushSubevent)),
63    ModuleChange(fn(&Context, ModuleChangeSubevent)),
64    ClientChange(fn(&Context, ClientChangeSubevent)),
65    KeyChangeSubevent(fn(&Context, KeyChangeSubevent)),
66    PersistenceSubevent(fn(&Context, PersistenceSubevent)),
67}
68
69#[distributed_slice()]
70pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..];
71
72#[distributed_slice()]
73pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..];
74
75#[distributed_slice()]
76pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];
77
78#[distributed_slice()]
79pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];
80
81#[distributed_slice()]
82pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];
83
84#[distributed_slice()]
85pub static CRON_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
86
87#[distributed_slice()]
88pub static INFO_COMMAND_HANDLER_LIST: [fn(&InfoContext, bool) -> ValkeyResult<()>] = [..];
89
90#[distributed_slice()]
91pub static CLIENT_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ClientChangeSubevent)] = [..];
92
93#[distributed_slice()]
94pub static KEY_SERVER_EVENTS_LIST: [fn(&Context, KeyChangeSubevent)] = [..];
95
96#[distributed_slice()]
97pub static SHUTDOWN_SERVER_EVENT_LIST: [fn(&Context, u64)] = [..];
98
99#[distributed_slice()]
100pub static PERSISTENCE_SERVER_EVENTS_LIST: [fn(&Context, PersistenceSubevent)] = [..];
101
102extern "C" fn cron_callback(
103    ctx: *mut raw::RedisModuleCtx,
104    _eid: raw::RedisModuleEvent,
105    _subevent: u64,
106    data: *mut ::std::os::raw::c_void,
107) {
108    let data: &raw::RedisModuleConfigChangeV1 =
109        unsafe { &*(data as *mut raw::RedisModuleConfigChangeV1) };
110    let ctx = Context::new(ctx);
111    CRON_SERVER_EVENTS_LIST.iter().for_each(|callback| {
112        callback(&ctx, data.version);
113    });
114}
115
116extern "C" fn role_changed_callback(
117    ctx: *mut raw::RedisModuleCtx,
118    _eid: raw::RedisModuleEvent,
119    subevent: u64,
120    _data: *mut ::std::os::raw::c_void,
121) {
122    let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER {
123        ServerRole::Primary
124    } else {
125        ServerRole::Replica
126    };
127    let ctx = Context::new(ctx);
128    ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| {
129        callback(&ctx, new_role);
130    });
131}
132
133extern "C" fn loading_event_callback(
134    ctx: *mut raw::RedisModuleCtx,
135    _eid: raw::RedisModuleEvent,
136    subevent: u64,
137    _data: *mut ::std::os::raw::c_void,
138) {
139    let loading_sub_event = match subevent {
140        raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted,
141        raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted,
142        raw::REDISMODULE_SUBEVENT_LOADING_AOF_START => LoadingSubevent::AofStarted,
143        raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended,
144        _ => LoadingSubevent::Failed,
145    };
146    let ctx = Context::new(ctx);
147    LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| {
148        callback(&ctx, loading_sub_event);
149    });
150}
151
152extern "C" fn flush_event_callback(
153    ctx: *mut raw::RedisModuleCtx,
154    _eid: raw::RedisModuleEvent,
155    subevent: u64,
156    _data: *mut ::std::os::raw::c_void,
157) {
158    let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START {
159        FlushSubevent::Started
160    } else {
161        FlushSubevent::Ended
162    };
163    let ctx = Context::new(ctx);
164    FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| {
165        callback(&ctx, flush_sub_event);
166    });
167}
168
169extern "C" fn module_change_event_callback(
170    ctx: *mut raw::RedisModuleCtx,
171    _eid: raw::RedisModuleEvent,
172    subevent: u64,
173    _data: *mut ::std::os::raw::c_void,
174) {
175    let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED {
176        ModuleChangeSubevent::Loaded
177    } else {
178        ModuleChangeSubevent::Unloaded
179    };
180    let ctx = Context::new(ctx);
181    MODULE_CHANGED_SERVER_EVENTS_LIST
182        .iter()
183        .for_each(|callback| {
184            callback(&ctx, module_changed_sub_event);
185        });
186}
187
188extern "C" fn client_change_event_callback(
189    ctx: *mut raw::RedisModuleCtx,
190    _eid: raw::RedisModuleEvent,
191    subevent: u64,
192    _data: *mut ::std::os::raw::c_void,
193) {
194    let client_change_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED {
195        ClientChangeSubevent::Connected
196    } else {
197        ClientChangeSubevent::Disconnected
198    };
199    let ctx = Context::new(ctx);
200    CLIENT_CHANGED_SERVER_EVENTS_LIST
201        .iter()
202        .for_each(|callback| {
203            callback(&ctx, client_change_sub_event);
204        });
205}
206
207extern "C" fn key_event_callback(
208    ctx: *mut raw::RedisModuleCtx,
209    _eid: raw::RedisModuleEvent,
210    subevent: u64,
211    _data: *mut ::std::os::raw::c_void,
212) {
213    let key_change_sub_event = match subevent {
214        raw::REDISMODULE_SUBEVENT_KEY_DELETED => KeyChangeSubevent::Deleted,
215        raw::REDISMODULE_SUBEVENT_KEY_EXPIRED => KeyChangeSubevent::Expired,
216        raw::REDISMODULE_SUBEVENT_KEY_EVICTED => KeyChangeSubevent::Evicted,
217        raw::REDISMODULE_SUBEVENT_KEY_OVERWRITTEN => KeyChangeSubevent::Overwritten,
218        _ => return,
219    };
220    let ctx = Context::new(ctx);
221    KEY_SERVER_EVENTS_LIST.iter().for_each(|callback| {
222        callback(&ctx, key_change_sub_event);
223    });
224}
225
226extern "C" fn server_shutdown_callback(
227    ctx: *mut raw::RedisModuleCtx,
228    _eid: raw::RedisModuleEvent,
229    subevent: u64,
230    _data: *mut ::std::os::raw::c_void,
231) {
232    let ctx = Context::new(ctx);
233    SHUTDOWN_SERVER_EVENT_LIST.iter().for_each(|callback| {
234        callback(&ctx, subevent);
235    });
236}
237
238extern "C" fn persistence_event_callback(
239    ctx: *mut raw::RedisModuleCtx,
240    _eid: raw::RedisModuleEvent,
241    subevent: u64,
242    _data: *mut ::std::os::raw::c_void,
243) {
244    let persistence_sub_event = match subevent {
245        raw::REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START => PersistenceSubevent::RdbStart,
246        raw::REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START => PersistenceSubevent::AofStart,
247        raw::REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START => PersistenceSubevent::SyncRdbStart,
248        raw::REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_AOF_START => PersistenceSubevent::SyncAofStart,
249        raw::REDISMODULE_SUBEVENT_PERSISTENCE_ENDED => PersistenceSubevent::Ended,
250        raw::REDISMODULE_SUBEVENT_PERSISTENCE_FAILED => PersistenceSubevent::Failed,
251        _ => return,
252    };
253    let ctx = Context::new(ctx);
254    PERSISTENCE_SERVER_EVENTS_LIST.iter().for_each(|callback| {
255        callback(&ctx, persistence_sub_event);
256    });
257}
258
259extern "C" fn config_change_event_callback(
260    ctx: *mut raw::RedisModuleCtx,
261    _eid: raw::RedisModuleEvent,
262    _subevent: u64,
263    data: *mut ::std::os::raw::c_void,
264) {
265    let data: &raw::RedisModuleConfigChange =
266        unsafe { &*(data as *mut raw::RedisModuleConfigChange) };
267    let config_names: Vec<_> = (0..data.num_changes)
268        .map(|i| unsafe {
269            let name = *data.config_names.offset(i as isize);
270            CStr::from_ptr(name)
271        })
272        .collect();
273    let config_names: Vec<_> = config_names
274        .iter()
275        .map(|v| {
276            v.to_str()
277                .expect("Got a configuration name which is not a valid utf8")
278        })
279        .collect();
280    let ctx = Context::new(ctx);
281    CONFIG_CHANGED_SERVER_EVENTS_LIST
282        .iter()
283        .for_each(|callback| {
284            callback(&ctx, config_names.as_slice());
285        });
286}
287
288fn register_single_server_event_type<T>(
289    ctx: &Context,
290    callbacks: &[fn(&Context, T)],
291    server_event: u64,
292    inner_callback: raw::RedisModuleEventCallback,
293) -> Result<(), ValkeyError> {
294    if !callbacks.is_empty() {
295        let res = unsafe {
296            raw::RedisModule_SubscribeToServerEvent.unwrap()(
297                ctx.ctx,
298                raw::RedisModuleEvent {
299                    id: server_event,
300                    dataver: 1,
301                },
302                inner_callback,
303            )
304        };
305        if res != raw::REDISMODULE_OK as i32 {
306            return Err(ValkeyError::Str("Failed subscribing to server event"));
307        }
308    }
309
310    Ok(())
311}
312
313pub fn register_server_events(ctx: &Context) -> Result<(), ValkeyError> {
314    register_single_server_event_type(
315        ctx,
316        &ROLE_CHANGED_SERVER_EVENTS_LIST,
317        raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
318        Some(role_changed_callback),
319    )?;
320    register_single_server_event_type(
321        ctx,
322        &LOADING_SERVER_EVENTS_LIST,
323        raw::REDISMODULE_EVENT_LOADING,
324        Some(loading_event_callback),
325    )?;
326    register_single_server_event_type(
327        ctx,
328        &FLUSH_SERVER_EVENTS_LIST,
329        raw::REDISMODULE_EVENT_FLUSHDB,
330        Some(flush_event_callback),
331    )?;
332    register_single_server_event_type(
333        ctx,
334        &MODULE_CHANGED_SERVER_EVENTS_LIST,
335        raw::REDISMODULE_EVENT_MODULE_CHANGE,
336        Some(module_change_event_callback),
337    )?;
338    register_single_server_event_type(
339        ctx,
340        &CLIENT_CHANGED_SERVER_EVENTS_LIST,
341        raw::REDISMODULE_EVENT_CLIENT_CHANGE,
342        Some(client_change_event_callback),
343    )?;
344    register_single_server_event_type(
345        ctx,
346        &CONFIG_CHANGED_SERVER_EVENTS_LIST,
347        raw::REDISMODULE_EVENT_CONFIG,
348        Some(config_change_event_callback),
349    )?;
350    register_single_server_event_type(
351        ctx,
352        &CRON_SERVER_EVENTS_LIST,
353        raw::REDISMODULE_EVENT_CRON_LOOP,
354        Some(cron_callback),
355    )?;
356    register_single_server_event_type(
357        ctx,
358        &KEY_SERVER_EVENTS_LIST,
359        raw::REDISMODULE_EVENT_KEY,
360        Some(key_event_callback),
361    )?;
362    register_single_server_event_type(
363        ctx,
364        &SHUTDOWN_SERVER_EVENT_LIST,
365        raw::REDISMODULE_EVENT_SHUTDOWN,
366        Some(server_shutdown_callback),
367    )?;
368    register_single_server_event_type(
369        ctx,
370        &PERSISTENCE_SERVER_EVENTS_LIST,
371        raw::REDISMODULE_EVENT_PERSISTENCE,
372        Some(persistence_event_callback),
373    )?;
374    Ok(())
375}