1use std::ffi::CStr;
2
3use crate::{context::Context, RedisError};
4use crate::{raw, InfoContext, RedisResult};
5use linkme::distributed_slice;
6
7#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
8pub enum ServerRole {
9 Primary,
10 Replica,
11}
12
13#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
14pub enum LoadingSubevent {
15 RdbStarted,
16 AofStarted,
17 ReplStarted,
18 Ended,
19 Failed,
20}
21
22#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
23pub enum FlushSubevent {
24 Started,
25 Ended,
26}
27
28#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
29pub enum ModuleChangeSubevent {
30 Loaded,
31 Unloaded,
32}
33
34#[derive(Clone)]
35pub enum ServerEventHandler {
36 RuleChanged(fn(&Context, ServerRole)),
37 Loading(fn(&Context, LoadingSubevent)),
38 Flush(fn(&Context, FlushSubevent)),
39 ModuleChange(fn(&Context, ModuleChangeSubevent)),
40}
41
42#[distributed_slice()]
43pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..];
44
45#[distributed_slice()]
46pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..];
47
48#[distributed_slice()]
49pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];
50
51#[distributed_slice()]
52pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];
53
54#[distributed_slice()]
55pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];
56
57#[distributed_slice()]
58pub static CRON_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
59
60#[distributed_slice()]
61pub static INFO_COMMAND_HANDLER_LIST: [fn(&InfoContext, bool) -> RedisResult<()>] = [..];
62
63extern "C" fn cron_callback(
64 ctx: *mut raw::RedisModuleCtx,
65 _eid: raw::RedisModuleEvent,
66 _subevent: u64,
67 data: *mut ::std::os::raw::c_void,
68) {
69 let data: &raw::RedisModuleConfigChangeV1 =
70 unsafe { &*(data as *mut raw::RedisModuleConfigChangeV1) };
71 let ctx = Context::new(ctx);
72 CRON_SERVER_EVENTS_LIST.iter().for_each(|callback| {
73 callback(&ctx, data.version);
74 });
75}
76
77extern "C" fn role_changed_callback(
78 ctx: *mut raw::RedisModuleCtx,
79 _eid: raw::RedisModuleEvent,
80 subevent: u64,
81 _data: *mut ::std::os::raw::c_void,
82) {
83 let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER {
84 ServerRole::Primary
85 } else {
86 ServerRole::Replica
87 };
88 let ctx = Context::new(ctx);
89 ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| {
90 callback(&ctx, new_role);
91 });
92}
93
94extern "C" fn loading_event_callback(
95 ctx: *mut raw::RedisModuleCtx,
96 _eid: raw::RedisModuleEvent,
97 subevent: u64,
98 _data: *mut ::std::os::raw::c_void,
99) {
100 let loading_sub_event = match subevent {
101 raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted,
102 raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted,
103 raw::REDISMODULE_SUBEVENT_LOADING_AOF_START => LoadingSubevent::AofStarted,
104 raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended,
105 _ => LoadingSubevent::Failed,
106 };
107 let ctx = Context::new(ctx);
108 LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| {
109 callback(&ctx, loading_sub_event);
110 });
111}
112
113extern "C" fn flush_event_callback(
114 ctx: *mut raw::RedisModuleCtx,
115 _eid: raw::RedisModuleEvent,
116 subevent: u64,
117 _data: *mut ::std::os::raw::c_void,
118) {
119 let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START {
120 FlushSubevent::Started
121 } else {
122 FlushSubevent::Ended
123 };
124 let ctx = Context::new(ctx);
125 FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| {
126 callback(&ctx, flush_sub_event);
127 });
128}
129
130extern "C" fn module_change_event_callback(
131 ctx: *mut raw::RedisModuleCtx,
132 _eid: raw::RedisModuleEvent,
133 subevent: u64,
134 _data: *mut ::std::os::raw::c_void,
135) {
136 let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED {
137 ModuleChangeSubevent::Loaded
138 } else {
139 ModuleChangeSubevent::Unloaded
140 };
141 let ctx = Context::new(ctx);
142 MODULE_CHANGED_SERVER_EVENTS_LIST
143 .iter()
144 .for_each(|callback| {
145 callback(&ctx, module_changed_sub_event);
146 });
147}
148
149extern "C" fn config_change_event_callback(
150 ctx: *mut raw::RedisModuleCtx,
151 _eid: raw::RedisModuleEvent,
152 _subevent: u64,
153 data: *mut ::std::os::raw::c_void,
154) {
155 let data: &raw::RedisModuleConfigChange =
156 unsafe { &*(data as *mut raw::RedisModuleConfigChange) };
157 let config_names: Vec<_> = (0..data.num_changes)
158 .map(|i| unsafe {
159 let name = *data.config_names.offset(i as isize);
160 CStr::from_ptr(name)
161 })
162 .collect();
163 let config_names: Vec<_> = config_names
164 .iter()
165 .map(|v| {
166 v.to_str()
167 .expect("Got a configuration name which is not a valid utf8")
168 })
169 .collect();
170 let ctx = Context::new(ctx);
171 CONFIG_CHANGED_SERVER_EVENTS_LIST
172 .iter()
173 .for_each(|callback| {
174 callback(&ctx, config_names.as_slice());
175 });
176}
177
178fn register_single_server_event_type<T>(
179 ctx: &Context,
180 callbacks: &[fn(&Context, T)],
181 server_event: u64,
182 inner_callback: raw::RedisModuleEventCallback,
183) -> Result<(), RedisError> {
184 if !callbacks.is_empty() {
185 let res = unsafe {
186 raw::RedisModule_SubscribeToServerEvent.unwrap()(
187 ctx.ctx,
188 raw::RedisModuleEvent {
189 id: server_event,
190 dataver: 1,
191 },
192 inner_callback,
193 )
194 };
195 if res != raw::REDISMODULE_OK as i32 {
196 return Err(RedisError::Str("Failed subscribing to server event"));
197 }
198 }
199
200 Ok(())
201}
202
203pub fn register_server_events(ctx: &Context) -> Result<(), RedisError> {
204 register_single_server_event_type(
205 ctx,
206 &ROLE_CHANGED_SERVER_EVENTS_LIST,
207 raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
208 Some(role_changed_callback),
209 )?;
210 register_single_server_event_type(
211 ctx,
212 &LOADING_SERVER_EVENTS_LIST,
213 raw::REDISMODULE_EVENT_LOADING,
214 Some(loading_event_callback),
215 )?;
216 register_single_server_event_type(
217 ctx,
218 &FLUSH_SERVER_EVENTS_LIST,
219 raw::REDISMODULE_EVENT_FLUSHDB,
220 Some(flush_event_callback),
221 )?;
222 register_single_server_event_type(
223 ctx,
224 &MODULE_CHANGED_SERVER_EVENTS_LIST,
225 raw::REDISMODULE_EVENT_MODULE_CHANGE,
226 Some(module_change_event_callback),
227 )?;
228 register_single_server_event_type(
229 ctx,
230 &CONFIG_CHANGED_SERVER_EVENTS_LIST,
231 raw::REDISMODULE_EVENT_CONFIG,
232 Some(config_change_event_callback),
233 )?;
234 register_single_server_event_type(
235 ctx,
236 &CRON_SERVER_EVENTS_LIST,
237 raw::REDISMODULE_EVENT_CRON_LOOP,
238 Some(cron_callback),
239 )?;
240 Ok(())
241}