valkey_module/context/
server_events.rs

1use std::ffi::CStr;
2
3use crate::{context::Context, ValkeyError};
4use crate::{raw, InfoContext, ValkeyResult};
5use linkme::distributed_slice;
6
7#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
8pub enum ServerRole {
9    Primary,
10    Replica,
11}
12
13#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
14pub enum LoadingSubevent {
15    RdbStarted,
16    AofStarted,
17    ReplStarted,
18    Ended,
19    Failed,
20}
21
22#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
23pub enum FlushSubevent {
24    Started,
25    Ended,
26}
27
28#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
29pub enum ModuleChangeSubevent {
30    Loaded,
31    Unloaded,
32}
33
34#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
35pub enum ClientChangeSubevent {
36    Connected,
37    Disconnected,
38}
39
40#[derive(Clone)]
41pub enum ServerEventHandler {
42    RoleChanged(fn(&Context, ServerRole)),
43    Loading(fn(&Context, LoadingSubevent)),
44    Flush(fn(&Context, FlushSubevent)),
45    ModuleChange(fn(&Context, ModuleChangeSubevent)),
46    ClientChange(fn(&Context, ClientChangeSubevent)),
47}
48
49#[distributed_slice()]
50pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..];
51
52#[distributed_slice()]
53pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..];
54
55#[distributed_slice()]
56pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];
57
58#[distributed_slice()]
59pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];
60
61#[distributed_slice()]
62pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];
63
64#[distributed_slice()]
65pub static CRON_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
66
67#[distributed_slice()]
68pub static INFO_COMMAND_HANDLER_LIST: [fn(&InfoContext, bool) -> ValkeyResult<()>] = [..];
69
70#[distributed_slice()]
71pub static CLIENT_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ClientChangeSubevent)] = [..];
72
73extern "C" fn cron_callback(
74    ctx: *mut raw::RedisModuleCtx,
75    _eid: raw::RedisModuleEvent,
76    _subevent: u64,
77    data: *mut ::std::os::raw::c_void,
78) {
79    let data: &raw::RedisModuleConfigChangeV1 =
80        unsafe { &*(data as *mut raw::RedisModuleConfigChangeV1) };
81    let ctx = Context::new(ctx);
82    CRON_SERVER_EVENTS_LIST.iter().for_each(|callback| {
83        callback(&ctx, data.version);
84    });
85}
86
87extern "C" fn role_changed_callback(
88    ctx: *mut raw::RedisModuleCtx,
89    _eid: raw::RedisModuleEvent,
90    subevent: u64,
91    _data: *mut ::std::os::raw::c_void,
92) {
93    let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER {
94        ServerRole::Primary
95    } else {
96        ServerRole::Replica
97    };
98    let ctx = Context::new(ctx);
99    ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| {
100        callback(&ctx, new_role);
101    });
102}
103
104extern "C" fn loading_event_callback(
105    ctx: *mut raw::RedisModuleCtx,
106    _eid: raw::RedisModuleEvent,
107    subevent: u64,
108    _data: *mut ::std::os::raw::c_void,
109) {
110    let loading_sub_event = match subevent {
111        raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted,
112        raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted,
113        raw::REDISMODULE_SUBEVENT_LOADING_AOF_START => LoadingSubevent::AofStarted,
114        raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended,
115        _ => LoadingSubevent::Failed,
116    };
117    let ctx = Context::new(ctx);
118    LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| {
119        callback(&ctx, loading_sub_event);
120    });
121}
122
123extern "C" fn flush_event_callback(
124    ctx: *mut raw::RedisModuleCtx,
125    _eid: raw::RedisModuleEvent,
126    subevent: u64,
127    _data: *mut ::std::os::raw::c_void,
128) {
129    let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START {
130        FlushSubevent::Started
131    } else {
132        FlushSubevent::Ended
133    };
134    let ctx = Context::new(ctx);
135    FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| {
136        callback(&ctx, flush_sub_event);
137    });
138}
139
140extern "C" fn module_change_event_callback(
141    ctx: *mut raw::RedisModuleCtx,
142    _eid: raw::RedisModuleEvent,
143    subevent: u64,
144    _data: *mut ::std::os::raw::c_void,
145) {
146    let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED {
147        ModuleChangeSubevent::Loaded
148    } else {
149        ModuleChangeSubevent::Unloaded
150    };
151    let ctx = Context::new(ctx);
152    MODULE_CHANGED_SERVER_EVENTS_LIST
153        .iter()
154        .for_each(|callback| {
155            callback(&ctx, module_changed_sub_event);
156        });
157}
158
159extern "C" fn client_change_event_callback(
160    ctx: *mut raw::RedisModuleCtx,
161    _eid: raw::RedisModuleEvent,
162    subevent: u64,
163    _data: *mut ::std::os::raw::c_void,
164) {
165    let client_change_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED {
166        ClientChangeSubevent::Connected
167    } else {
168        ClientChangeSubevent::Disconnected
169    };
170    let ctx = Context::new(ctx);
171    CLIENT_CHANGED_SERVER_EVENTS_LIST
172        .iter()
173        .for_each(|callback| {
174            callback(&ctx, client_change_sub_event);
175        });
176}
177
178extern "C" fn config_change_event_callback(
179    ctx: *mut raw::RedisModuleCtx,
180    _eid: raw::RedisModuleEvent,
181    _subevent: u64,
182    data: *mut ::std::os::raw::c_void,
183) {
184    let data: &raw::RedisModuleConfigChange =
185        unsafe { &*(data as *mut raw::RedisModuleConfigChange) };
186    let config_names: Vec<_> = (0..data.num_changes)
187        .map(|i| unsafe {
188            let name = *data.config_names.offset(i as isize);
189            CStr::from_ptr(name)
190        })
191        .collect();
192    let config_names: Vec<_> = config_names
193        .iter()
194        .map(|v| {
195            v.to_str()
196                .expect("Got a configuration name which is not a valid utf8")
197        })
198        .collect();
199    let ctx = Context::new(ctx);
200    CONFIG_CHANGED_SERVER_EVENTS_LIST
201        .iter()
202        .for_each(|callback| {
203            callback(&ctx, config_names.as_slice());
204        });
205}
206
207fn register_single_server_event_type<T>(
208    ctx: &Context,
209    callbacks: &[fn(&Context, T)],
210    server_event: u64,
211    inner_callback: raw::RedisModuleEventCallback,
212) -> Result<(), ValkeyError> {
213    if !callbacks.is_empty() {
214        let res = unsafe {
215            raw::RedisModule_SubscribeToServerEvent.unwrap()(
216                ctx.ctx,
217                raw::RedisModuleEvent {
218                    id: server_event,
219                    dataver: 1,
220                },
221                inner_callback,
222            )
223        };
224        if res != raw::REDISMODULE_OK as i32 {
225            return Err(ValkeyError::Str("Failed subscribing to server event"));
226        }
227    }
228
229    Ok(())
230}
231
232pub fn register_server_events(ctx: &Context) -> Result<(), ValkeyError> {
233    register_single_server_event_type(
234        ctx,
235        &ROLE_CHANGED_SERVER_EVENTS_LIST,
236        raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
237        Some(role_changed_callback),
238    )?;
239    register_single_server_event_type(
240        ctx,
241        &LOADING_SERVER_EVENTS_LIST,
242        raw::REDISMODULE_EVENT_LOADING,
243        Some(loading_event_callback),
244    )?;
245    register_single_server_event_type(
246        ctx,
247        &FLUSH_SERVER_EVENTS_LIST,
248        raw::REDISMODULE_EVENT_FLUSHDB,
249        Some(flush_event_callback),
250    )?;
251    register_single_server_event_type(
252        ctx,
253        &MODULE_CHANGED_SERVER_EVENTS_LIST,
254        raw::REDISMODULE_EVENT_MODULE_CHANGE,
255        Some(module_change_event_callback),
256    )?;
257    register_single_server_event_type(
258        ctx,
259        &CLIENT_CHANGED_SERVER_EVENTS_LIST,
260        raw::REDISMODULE_EVENT_CLIENT_CHANGE,
261        Some(client_change_event_callback),
262    )?;
263    register_single_server_event_type(
264        ctx,
265        &CONFIG_CHANGED_SERVER_EVENTS_LIST,
266        raw::REDISMODULE_EVENT_CONFIG,
267        Some(config_change_event_callback),
268    )?;
269    register_single_server_event_type(
270        ctx,
271        &CRON_SERVER_EVENTS_LIST,
272        raw::REDISMODULE_EVENT_CRON_LOOP,
273        Some(cron_callback),
274    )?;
275    Ok(())
276}