1use std::ffi::CStr;
2
3use crate::{context::Context, ValkeyError};
4use crate::{raw, InfoContext, ValkeyResult};
5use linkme::distributed_slice;
6
7#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
8pub enum ServerRole {
9 Primary,
10 Replica,
11}
12
13#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
14pub enum LoadingSubevent {
15 RdbStarted,
16 AofStarted,
17 ReplStarted,
18 Ended,
19 Failed,
20}
21
22#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
23pub enum FlushSubevent {
24 Started,
25 Ended,
26}
27
28#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
29pub enum ModuleChangeSubevent {
30 Loaded,
31 Unloaded,
32}
33
34#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
35pub enum ClientChangeSubevent {
36 Connected,
37 Disconnected,
38}
39
40#[derive(Clone)]
41pub enum ServerEventHandler {
42 RoleChanged(fn(&Context, ServerRole)),
43 Loading(fn(&Context, LoadingSubevent)),
44 Flush(fn(&Context, FlushSubevent)),
45 ModuleChange(fn(&Context, ModuleChangeSubevent)),
46 ClientChange(fn(&Context, ClientChangeSubevent)),
47}
48
49#[distributed_slice()]
50pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..];
51
52#[distributed_slice()]
53pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..];
54
55#[distributed_slice()]
56pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];
57
58#[distributed_slice()]
59pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];
60
61#[distributed_slice()]
62pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];
63
64#[distributed_slice()]
65pub static CRON_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
66
67#[distributed_slice()]
68pub static INFO_COMMAND_HANDLER_LIST: [fn(&InfoContext, bool) -> ValkeyResult<()>] = [..];
69
70#[distributed_slice()]
71pub static CLIENT_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ClientChangeSubevent)] = [..];
72
73extern "C" fn cron_callback(
74 ctx: *mut raw::RedisModuleCtx,
75 _eid: raw::RedisModuleEvent,
76 _subevent: u64,
77 data: *mut ::std::os::raw::c_void,
78) {
79 let data: &raw::RedisModuleConfigChangeV1 =
80 unsafe { &*(data as *mut raw::RedisModuleConfigChangeV1) };
81 let ctx = Context::new(ctx);
82 CRON_SERVER_EVENTS_LIST.iter().for_each(|callback| {
83 callback(&ctx, data.version);
84 });
85}
86
87extern "C" fn role_changed_callback(
88 ctx: *mut raw::RedisModuleCtx,
89 _eid: raw::RedisModuleEvent,
90 subevent: u64,
91 _data: *mut ::std::os::raw::c_void,
92) {
93 let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER {
94 ServerRole::Primary
95 } else {
96 ServerRole::Replica
97 };
98 let ctx = Context::new(ctx);
99 ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| {
100 callback(&ctx, new_role);
101 });
102}
103
104extern "C" fn loading_event_callback(
105 ctx: *mut raw::RedisModuleCtx,
106 _eid: raw::RedisModuleEvent,
107 subevent: u64,
108 _data: *mut ::std::os::raw::c_void,
109) {
110 let loading_sub_event = match subevent {
111 raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted,
112 raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted,
113 raw::REDISMODULE_SUBEVENT_LOADING_AOF_START => LoadingSubevent::AofStarted,
114 raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended,
115 _ => LoadingSubevent::Failed,
116 };
117 let ctx = Context::new(ctx);
118 LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| {
119 callback(&ctx, loading_sub_event);
120 });
121}
122
123extern "C" fn flush_event_callback(
124 ctx: *mut raw::RedisModuleCtx,
125 _eid: raw::RedisModuleEvent,
126 subevent: u64,
127 _data: *mut ::std::os::raw::c_void,
128) {
129 let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START {
130 FlushSubevent::Started
131 } else {
132 FlushSubevent::Ended
133 };
134 let ctx = Context::new(ctx);
135 FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| {
136 callback(&ctx, flush_sub_event);
137 });
138}
139
140extern "C" fn module_change_event_callback(
141 ctx: *mut raw::RedisModuleCtx,
142 _eid: raw::RedisModuleEvent,
143 subevent: u64,
144 _data: *mut ::std::os::raw::c_void,
145) {
146 let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED {
147 ModuleChangeSubevent::Loaded
148 } else {
149 ModuleChangeSubevent::Unloaded
150 };
151 let ctx = Context::new(ctx);
152 MODULE_CHANGED_SERVER_EVENTS_LIST
153 .iter()
154 .for_each(|callback| {
155 callback(&ctx, module_changed_sub_event);
156 });
157}
158
159extern "C" fn client_change_event_callback(
160 ctx: *mut raw::RedisModuleCtx,
161 _eid: raw::RedisModuleEvent,
162 subevent: u64,
163 _data: *mut ::std::os::raw::c_void,
164) {
165 let client_change_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED {
166 ClientChangeSubevent::Connected
167 } else {
168 ClientChangeSubevent::Disconnected
169 };
170 let ctx = Context::new(ctx);
171 CLIENT_CHANGED_SERVER_EVENTS_LIST
172 .iter()
173 .for_each(|callback| {
174 callback(&ctx, client_change_sub_event);
175 });
176}
177
178extern "C" fn config_change_event_callback(
179 ctx: *mut raw::RedisModuleCtx,
180 _eid: raw::RedisModuleEvent,
181 _subevent: u64,
182 data: *mut ::std::os::raw::c_void,
183) {
184 let data: &raw::RedisModuleConfigChange =
185 unsafe { &*(data as *mut raw::RedisModuleConfigChange) };
186 let config_names: Vec<_> = (0..data.num_changes)
187 .map(|i| unsafe {
188 let name = *data.config_names.offset(i as isize);
189 CStr::from_ptr(name)
190 })
191 .collect();
192 let config_names: Vec<_> = config_names
193 .iter()
194 .map(|v| {
195 v.to_str()
196 .expect("Got a configuration name which is not a valid utf8")
197 })
198 .collect();
199 let ctx = Context::new(ctx);
200 CONFIG_CHANGED_SERVER_EVENTS_LIST
201 .iter()
202 .for_each(|callback| {
203 callback(&ctx, config_names.as_slice());
204 });
205}
206
207fn register_single_server_event_type<T>(
208 ctx: &Context,
209 callbacks: &[fn(&Context, T)],
210 server_event: u64,
211 inner_callback: raw::RedisModuleEventCallback,
212) -> Result<(), ValkeyError> {
213 if !callbacks.is_empty() {
214 let res = unsafe {
215 raw::RedisModule_SubscribeToServerEvent.unwrap()(
216 ctx.ctx,
217 raw::RedisModuleEvent {
218 id: server_event,
219 dataver: 1,
220 },
221 inner_callback,
222 )
223 };
224 if res != raw::REDISMODULE_OK as i32 {
225 return Err(ValkeyError::Str("Failed subscribing to server event"));
226 }
227 }
228
229 Ok(())
230}
231
232pub fn register_server_events(ctx: &Context) -> Result<(), ValkeyError> {
233 register_single_server_event_type(
234 ctx,
235 &ROLE_CHANGED_SERVER_EVENTS_LIST,
236 raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
237 Some(role_changed_callback),
238 )?;
239 register_single_server_event_type(
240 ctx,
241 &LOADING_SERVER_EVENTS_LIST,
242 raw::REDISMODULE_EVENT_LOADING,
243 Some(loading_event_callback),
244 )?;
245 register_single_server_event_type(
246 ctx,
247 &FLUSH_SERVER_EVENTS_LIST,
248 raw::REDISMODULE_EVENT_FLUSHDB,
249 Some(flush_event_callback),
250 )?;
251 register_single_server_event_type(
252 ctx,
253 &MODULE_CHANGED_SERVER_EVENTS_LIST,
254 raw::REDISMODULE_EVENT_MODULE_CHANGE,
255 Some(module_change_event_callback),
256 )?;
257 register_single_server_event_type(
258 ctx,
259 &CLIENT_CHANGED_SERVER_EVENTS_LIST,
260 raw::REDISMODULE_EVENT_CLIENT_CHANGE,
261 Some(client_change_event_callback),
262 )?;
263 register_single_server_event_type(
264 ctx,
265 &CONFIG_CHANGED_SERVER_EVENTS_LIST,
266 raw::REDISMODULE_EVENT_CONFIG,
267 Some(config_change_event_callback),
268 )?;
269 register_single_server_event_type(
270 ctx,
271 &CRON_SERVER_EVENTS_LIST,
272 raw::REDISMODULE_EVENT_CRON_LOOP,
273 Some(cron_callback),
274 )?;
275 Ok(())
276}