redis_module/context/
server_events.rs

1use std::ffi::CStr;
2
3use crate::{context::Context, RedisError};
4use crate::{raw, InfoContext, RedisResult};
5use linkme::distributed_slice;
6
7#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
8pub enum ServerRole {
9    Primary,
10    Replica,
11}
12
13#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
14pub enum LoadingSubevent {
15    RdbStarted,
16    AofStarted,
17    ReplStarted,
18    Ended,
19    Failed,
20}
21
22#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
23pub enum FlushSubevent {
24    Started,
25    Ended,
26}
27
28#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
29pub enum ModuleChangeSubevent {
30    Loaded,
31    Unloaded,
32}
33
34#[derive(Clone)]
35pub enum ServerEventHandler {
36    RuleChanged(fn(&Context, ServerRole)),
37    Loading(fn(&Context, LoadingSubevent)),
38    Flush(fn(&Context, FlushSubevent)),
39    ModuleChange(fn(&Context, ModuleChangeSubevent)),
40}
41
42#[distributed_slice()]
43pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..];
44
45#[distributed_slice()]
46pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..];
47
48#[distributed_slice()]
49pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];
50
51#[distributed_slice()]
52pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];
53
54#[distributed_slice()]
55pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];
56
57#[distributed_slice()]
58pub static CRON_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
59
60#[distributed_slice()]
61pub static INFO_COMMAND_HANDLER_LIST: [fn(&InfoContext, bool) -> RedisResult<()>] = [..];
62
63extern "C" fn cron_callback(
64    ctx: *mut raw::RedisModuleCtx,
65    _eid: raw::RedisModuleEvent,
66    _subevent: u64,
67    data: *mut ::std::os::raw::c_void,
68) {
69    let data: &raw::RedisModuleConfigChangeV1 =
70        unsafe { &*(data as *mut raw::RedisModuleConfigChangeV1) };
71    let ctx = Context::new(ctx);
72    CRON_SERVER_EVENTS_LIST.iter().for_each(|callback| {
73        callback(&ctx, data.version);
74    });
75}
76
77extern "C" fn role_changed_callback(
78    ctx: *mut raw::RedisModuleCtx,
79    _eid: raw::RedisModuleEvent,
80    subevent: u64,
81    _data: *mut ::std::os::raw::c_void,
82) {
83    let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER {
84        ServerRole::Primary
85    } else {
86        ServerRole::Replica
87    };
88    let ctx = Context::new(ctx);
89    ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| {
90        callback(&ctx, new_role);
91    });
92}
93
94extern "C" fn loading_event_callback(
95    ctx: *mut raw::RedisModuleCtx,
96    _eid: raw::RedisModuleEvent,
97    subevent: u64,
98    _data: *mut ::std::os::raw::c_void,
99) {
100    let loading_sub_event = match subevent {
101        raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted,
102        raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted,
103        raw::REDISMODULE_SUBEVENT_LOADING_AOF_START => LoadingSubevent::AofStarted,
104        raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended,
105        _ => LoadingSubevent::Failed,
106    };
107    let ctx = Context::new(ctx);
108    LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| {
109        callback(&ctx, loading_sub_event);
110    });
111}
112
113extern "C" fn flush_event_callback(
114    ctx: *mut raw::RedisModuleCtx,
115    _eid: raw::RedisModuleEvent,
116    subevent: u64,
117    _data: *mut ::std::os::raw::c_void,
118) {
119    let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START {
120        FlushSubevent::Started
121    } else {
122        FlushSubevent::Ended
123    };
124    let ctx = Context::new(ctx);
125    FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| {
126        callback(&ctx, flush_sub_event);
127    });
128}
129
130extern "C" fn module_change_event_callback(
131    ctx: *mut raw::RedisModuleCtx,
132    _eid: raw::RedisModuleEvent,
133    subevent: u64,
134    _data: *mut ::std::os::raw::c_void,
135) {
136    let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED {
137        ModuleChangeSubevent::Loaded
138    } else {
139        ModuleChangeSubevent::Unloaded
140    };
141    let ctx = Context::new(ctx);
142    MODULE_CHANGED_SERVER_EVENTS_LIST
143        .iter()
144        .for_each(|callback| {
145            callback(&ctx, module_changed_sub_event);
146        });
147}
148
149extern "C" fn config_change_event_callback(
150    ctx: *mut raw::RedisModuleCtx,
151    _eid: raw::RedisModuleEvent,
152    _subevent: u64,
153    data: *mut ::std::os::raw::c_void,
154) {
155    let data: &raw::RedisModuleConfigChange =
156        unsafe { &*(data as *mut raw::RedisModuleConfigChange) };
157    let config_names: Vec<_> = (0..data.num_changes)
158        .map(|i| unsafe {
159            let name = *data.config_names.offset(i as isize);
160            CStr::from_ptr(name)
161        })
162        .collect();
163    let config_names: Vec<_> = config_names
164        .iter()
165        .map(|v| {
166            v.to_str()
167                .expect("Got a configuration name which is not a valid utf8")
168        })
169        .collect();
170    let ctx = Context::new(ctx);
171    CONFIG_CHANGED_SERVER_EVENTS_LIST
172        .iter()
173        .for_each(|callback| {
174            callback(&ctx, config_names.as_slice());
175        });
176}
177
178fn register_single_server_event_type<T>(
179    ctx: &Context,
180    callbacks: &[fn(&Context, T)],
181    server_event: u64,
182    inner_callback: raw::RedisModuleEventCallback,
183) -> Result<(), RedisError> {
184    if !callbacks.is_empty() {
185        let res = unsafe {
186            raw::RedisModule_SubscribeToServerEvent.unwrap()(
187                ctx.ctx,
188                raw::RedisModuleEvent {
189                    id: server_event,
190                    dataver: 1,
191                },
192                inner_callback,
193            )
194        };
195        if res != raw::REDISMODULE_OK as i32 {
196            return Err(RedisError::Str("Failed subscribing to server event"));
197        }
198    }
199
200    Ok(())
201}
202
203pub fn register_server_events(ctx: &Context) -> Result<(), RedisError> {
204    register_single_server_event_type(
205        ctx,
206        &ROLE_CHANGED_SERVER_EVENTS_LIST,
207        raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
208        Some(role_changed_callback),
209    )?;
210    register_single_server_event_type(
211        ctx,
212        &LOADING_SERVER_EVENTS_LIST,
213        raw::REDISMODULE_EVENT_LOADING,
214        Some(loading_event_callback),
215    )?;
216    register_single_server_event_type(
217        ctx,
218        &FLUSH_SERVER_EVENTS_LIST,
219        raw::REDISMODULE_EVENT_FLUSHDB,
220        Some(flush_event_callback),
221    )?;
222    register_single_server_event_type(
223        ctx,
224        &MODULE_CHANGED_SERVER_EVENTS_LIST,
225        raw::REDISMODULE_EVENT_MODULE_CHANGE,
226        Some(module_change_event_callback),
227    )?;
228    register_single_server_event_type(
229        ctx,
230        &CONFIG_CHANGED_SERVER_EVENTS_LIST,
231        raw::REDISMODULE_EVENT_CONFIG,
232        Some(config_change_event_callback),
233    )?;
234    register_single_server_event_type(
235        ctx,
236        &CRON_SERVER_EVENTS_LIST,
237        raw::REDISMODULE_EVENT_CRON_LOOP,
238        Some(cron_callback),
239    )?;
240    Ok(())
241}