1use std::ffi::CStr;
2
3use crate::{context::Context, ValkeyError};
4use crate::{raw, InfoContext, ValkeyResult};
5use linkme::distributed_slice;
6
7#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
8pub enum ServerRole {
9 Primary,
10 Replica,
11}
12
13#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
14pub enum LoadingSubevent {
15 RdbStarted,
16 AofStarted,
17 ReplStarted,
18 Ended,
19 Failed,
20}
21
22#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
23pub enum FlushSubevent {
24 Started,
25 Ended,
26}
27
28#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
29pub enum ModuleChangeSubevent {
30 Loaded,
31 Unloaded,
32}
33
34#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
35pub enum ClientChangeSubevent {
36 Connected,
37 Disconnected,
38}
39
40#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
41pub enum KeyChangeSubevent {
42 Deleted,
43 Expired,
44 Evicted,
45 Overwritten,
46}
47
48#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
49pub enum PersistenceSubevent {
50 RdbStart,
51 AofStart,
52 SyncRdbStart,
53 SyncAofStart,
54 Ended,
55 Failed,
56}
57
58#[derive(Clone)]
59pub enum ServerEventHandler {
60 RoleChanged(fn(&Context, ServerRole)),
61 Loading(fn(&Context, LoadingSubevent)),
62 Flush(fn(&Context, FlushSubevent)),
63 ModuleChange(fn(&Context, ModuleChangeSubevent)),
64 ClientChange(fn(&Context, ClientChangeSubevent)),
65 KeyChangeSubevent(fn(&Context, KeyChangeSubevent)),
66 PersistenceSubevent(fn(&Context, PersistenceSubevent)),
67}
68
69#[distributed_slice()]
70pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..];
71
72#[distributed_slice()]
73pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..];
74
75#[distributed_slice()]
76pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];
77
78#[distributed_slice()]
79pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];
80
81#[distributed_slice()]
82pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];
83
84#[distributed_slice()]
85pub static CRON_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
86
87#[distributed_slice()]
88pub static INFO_COMMAND_HANDLER_LIST: [fn(&InfoContext, bool) -> ValkeyResult<()>] = [..];
89
90#[distributed_slice()]
91pub static CLIENT_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ClientChangeSubevent)] = [..];
92
93#[distributed_slice()]
94pub static KEY_SERVER_EVENTS_LIST: [fn(&Context, KeyChangeSubevent)] = [..];
95
96#[distributed_slice()]
97pub static SHUTDOWN_SERVER_EVENT_LIST: [fn(&Context, u64)] = [..];
98
99#[distributed_slice()]
100pub static PERSISTENCE_SERVER_EVENTS_LIST: [fn(&Context, PersistenceSubevent)] = [..];
101
102extern "C" fn cron_callback(
103 ctx: *mut raw::RedisModuleCtx,
104 _eid: raw::RedisModuleEvent,
105 _subevent: u64,
106 data: *mut ::std::os::raw::c_void,
107) {
108 let data: &raw::RedisModuleConfigChangeV1 =
109 unsafe { &*(data as *mut raw::RedisModuleConfigChangeV1) };
110 let ctx = Context::new(ctx);
111 CRON_SERVER_EVENTS_LIST.iter().for_each(|callback| {
112 callback(&ctx, data.version);
113 });
114}
115
116extern "C" fn role_changed_callback(
117 ctx: *mut raw::RedisModuleCtx,
118 _eid: raw::RedisModuleEvent,
119 subevent: u64,
120 _data: *mut ::std::os::raw::c_void,
121) {
122 let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER {
123 ServerRole::Primary
124 } else {
125 ServerRole::Replica
126 };
127 let ctx = Context::new(ctx);
128 ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| {
129 callback(&ctx, new_role);
130 });
131}
132
133extern "C" fn loading_event_callback(
134 ctx: *mut raw::RedisModuleCtx,
135 _eid: raw::RedisModuleEvent,
136 subevent: u64,
137 _data: *mut ::std::os::raw::c_void,
138) {
139 let loading_sub_event = match subevent {
140 raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted,
141 raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted,
142 raw::REDISMODULE_SUBEVENT_LOADING_AOF_START => LoadingSubevent::AofStarted,
143 raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended,
144 _ => LoadingSubevent::Failed,
145 };
146 let ctx = Context::new(ctx);
147 LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| {
148 callback(&ctx, loading_sub_event);
149 });
150}
151
152extern "C" fn flush_event_callback(
153 ctx: *mut raw::RedisModuleCtx,
154 _eid: raw::RedisModuleEvent,
155 subevent: u64,
156 _data: *mut ::std::os::raw::c_void,
157) {
158 let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START {
159 FlushSubevent::Started
160 } else {
161 FlushSubevent::Ended
162 };
163 let ctx = Context::new(ctx);
164 FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| {
165 callback(&ctx, flush_sub_event);
166 });
167}
168
169extern "C" fn module_change_event_callback(
170 ctx: *mut raw::RedisModuleCtx,
171 _eid: raw::RedisModuleEvent,
172 subevent: u64,
173 _data: *mut ::std::os::raw::c_void,
174) {
175 let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED {
176 ModuleChangeSubevent::Loaded
177 } else {
178 ModuleChangeSubevent::Unloaded
179 };
180 let ctx = Context::new(ctx);
181 MODULE_CHANGED_SERVER_EVENTS_LIST
182 .iter()
183 .for_each(|callback| {
184 callback(&ctx, module_changed_sub_event);
185 });
186}
187
188extern "C" fn client_change_event_callback(
189 ctx: *mut raw::RedisModuleCtx,
190 _eid: raw::RedisModuleEvent,
191 subevent: u64,
192 _data: *mut ::std::os::raw::c_void,
193) {
194 let client_change_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED {
195 ClientChangeSubevent::Connected
196 } else {
197 ClientChangeSubevent::Disconnected
198 };
199 let ctx = Context::new(ctx);
200 CLIENT_CHANGED_SERVER_EVENTS_LIST
201 .iter()
202 .for_each(|callback| {
203 callback(&ctx, client_change_sub_event);
204 });
205}
206
207extern "C" fn key_event_callback(
208 ctx: *mut raw::RedisModuleCtx,
209 _eid: raw::RedisModuleEvent,
210 subevent: u64,
211 _data: *mut ::std::os::raw::c_void,
212) {
213 let key_change_sub_event = match subevent {
214 raw::REDISMODULE_SUBEVENT_KEY_DELETED => KeyChangeSubevent::Deleted,
215 raw::REDISMODULE_SUBEVENT_KEY_EXPIRED => KeyChangeSubevent::Expired,
216 raw::REDISMODULE_SUBEVENT_KEY_EVICTED => KeyChangeSubevent::Evicted,
217 raw::REDISMODULE_SUBEVENT_KEY_OVERWRITTEN => KeyChangeSubevent::Overwritten,
218 _ => return,
219 };
220 let ctx = Context::new(ctx);
221 KEY_SERVER_EVENTS_LIST.iter().for_each(|callback| {
222 callback(&ctx, key_change_sub_event);
223 });
224}
225
226extern "C" fn server_shutdown_callback(
227 ctx: *mut raw::RedisModuleCtx,
228 _eid: raw::RedisModuleEvent,
229 subevent: u64,
230 _data: *mut ::std::os::raw::c_void,
231) {
232 let ctx = Context::new(ctx);
233 SHUTDOWN_SERVER_EVENT_LIST.iter().for_each(|callback| {
234 callback(&ctx, subevent);
235 });
236}
237
238extern "C" fn persistence_event_callback(
239 ctx: *mut raw::RedisModuleCtx,
240 _eid: raw::RedisModuleEvent,
241 subevent: u64,
242 _data: *mut ::std::os::raw::c_void,
243) {
244 let persistence_sub_event = match subevent {
245 raw::REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START => PersistenceSubevent::RdbStart,
246 raw::REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START => PersistenceSubevent::AofStart,
247 raw::REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START => PersistenceSubevent::SyncRdbStart,
248 raw::REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_AOF_START => PersistenceSubevent::SyncAofStart,
249 raw::REDISMODULE_SUBEVENT_PERSISTENCE_ENDED => PersistenceSubevent::Ended,
250 raw::REDISMODULE_SUBEVENT_PERSISTENCE_FAILED => PersistenceSubevent::Failed,
251 _ => return,
252 };
253 let ctx = Context::new(ctx);
254 PERSISTENCE_SERVER_EVENTS_LIST.iter().for_each(|callback| {
255 callback(&ctx, persistence_sub_event);
256 });
257}
258
259extern "C" fn config_change_event_callback(
260 ctx: *mut raw::RedisModuleCtx,
261 _eid: raw::RedisModuleEvent,
262 _subevent: u64,
263 data: *mut ::std::os::raw::c_void,
264) {
265 let data: &raw::RedisModuleConfigChange =
266 unsafe { &*(data as *mut raw::RedisModuleConfigChange) };
267 let config_names: Vec<_> = (0..data.num_changes)
268 .map(|i| unsafe {
269 let name = *data.config_names.offset(i as isize);
270 CStr::from_ptr(name)
271 })
272 .collect();
273 let config_names: Vec<_> = config_names
274 .iter()
275 .map(|v| {
276 v.to_str()
277 .expect("Got a configuration name which is not a valid utf8")
278 })
279 .collect();
280 let ctx = Context::new(ctx);
281 CONFIG_CHANGED_SERVER_EVENTS_LIST
282 .iter()
283 .for_each(|callback| {
284 callback(&ctx, config_names.as_slice());
285 });
286}
287
288fn register_single_server_event_type<T>(
289 ctx: &Context,
290 callbacks: &[fn(&Context, T)],
291 server_event: u64,
292 inner_callback: raw::RedisModuleEventCallback,
293) -> Result<(), ValkeyError> {
294 if !callbacks.is_empty() {
295 let res = unsafe {
296 raw::RedisModule_SubscribeToServerEvent.unwrap()(
297 ctx.ctx,
298 raw::RedisModuleEvent {
299 id: server_event,
300 dataver: 1,
301 },
302 inner_callback,
303 )
304 };
305 if res != raw::REDISMODULE_OK as i32 {
306 return Err(ValkeyError::Str("Failed subscribing to server event"));
307 }
308 }
309
310 Ok(())
311}
312
313pub fn register_server_events(ctx: &Context) -> Result<(), ValkeyError> {
314 register_single_server_event_type(
315 ctx,
316 &ROLE_CHANGED_SERVER_EVENTS_LIST,
317 raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
318 Some(role_changed_callback),
319 )?;
320 register_single_server_event_type(
321 ctx,
322 &LOADING_SERVER_EVENTS_LIST,
323 raw::REDISMODULE_EVENT_LOADING,
324 Some(loading_event_callback),
325 )?;
326 register_single_server_event_type(
327 ctx,
328 &FLUSH_SERVER_EVENTS_LIST,
329 raw::REDISMODULE_EVENT_FLUSHDB,
330 Some(flush_event_callback),
331 )?;
332 register_single_server_event_type(
333 ctx,
334 &MODULE_CHANGED_SERVER_EVENTS_LIST,
335 raw::REDISMODULE_EVENT_MODULE_CHANGE,
336 Some(module_change_event_callback),
337 )?;
338 register_single_server_event_type(
339 ctx,
340 &CLIENT_CHANGED_SERVER_EVENTS_LIST,
341 raw::REDISMODULE_EVENT_CLIENT_CHANGE,
342 Some(client_change_event_callback),
343 )?;
344 register_single_server_event_type(
345 ctx,
346 &CONFIG_CHANGED_SERVER_EVENTS_LIST,
347 raw::REDISMODULE_EVENT_CONFIG,
348 Some(config_change_event_callback),
349 )?;
350 register_single_server_event_type(
351 ctx,
352 &CRON_SERVER_EVENTS_LIST,
353 raw::REDISMODULE_EVENT_CRON_LOOP,
354 Some(cron_callback),
355 )?;
356 register_single_server_event_type(
357 ctx,
358 &KEY_SERVER_EVENTS_LIST,
359 raw::REDISMODULE_EVENT_KEY,
360 Some(key_event_callback),
361 )?;
362 register_single_server_event_type(
363 ctx,
364 &SHUTDOWN_SERVER_EVENT_LIST,
365 raw::REDISMODULE_EVENT_SHUTDOWN,
366 Some(server_shutdown_callback),
367 )?;
368 register_single_server_event_type(
369 ctx,
370 &PERSISTENCE_SERVER_EVENTS_LIST,
371 raw::REDISMODULE_EVENT_PERSISTENCE,
372 Some(persistence_event_callback),
373 )?;
374 Ok(())
375}