redis_event/lib.rs
1/*!
2* 用于监听Redis的写入操作,据此可以实现数据复制,监控等相关的应用。
3*
4* # 原理
5*
6* 此crate实现了[Redis Replication协议],在运行时,程序将以replica的身份连接到Redis,相当于Redis的一个副本。
7*
8* 所以,在程序连接上某个Redis之后,Redis会将它当前的所有数据以RDB的格式dump一份,dump完毕之后便发送过来,这个RDB中的每一条数据就对应一个[`Event`]`::RDB`事件。
9*
10* 在这之后,Redis接收到来自客户端的写入操作(即Redis命令)后,也会将这个写入操作传播给它的replica,每一个写入操作就对应一个[`Event`]`::AOF`事件。
11*
12* # 示例
13*
14* ```no_run
15* use std::net::{IpAddr, SocketAddr};
16* use std::sync::atomic::AtomicBool;
17* use std::sync::Arc;
18* use std::str::FromStr;
19* use std::rc::Rc;
20* use std::cell::RefCell;
21* use redis_event::listener;
22* use redis_event::config::Config;
23* use redis_event::{NoOpEventHandler, RedisListener};
24*
25* fn main() -> std::io::Result<()> {
26* let host = String::from("127.0.0.1");
27* let port = 6379;
28*
29* let conf = Config {
30* is_discard_rdb: false, // 不跳过RDB
31* is_aof: false, // 不处理AOF
32* host,
33* port,
34* username: String::new(), // 用户名为空
35* password: String::new(), // 密码为空
36* repl_id: String::from("?"), // replication id,若无此id,设置为?即可
37* repl_offset: -1, // replication offset,若无此offset,设置为-1即可
38* read_timeout: None, // None,即读取永不超时
39* write_timeout: None, // None,即写入永不超时
40* is_tls_enabled: false, // 不启用TLS
41* is_tls_insecure: false, // 未启用TLS,设置为false即可
42* identity: None, // 未启用TLS,设置为None即可
43* identity_passwd: None // 未启用TLS,设置为None即可
44* };
45* let running = Arc::new(AtomicBool::new(true));
46*
47* let mut builder = listener::Builder::new();
48* builder.with_config(conf);
49* // 设置控制变量, 通过此变量在外界中断`redis_event`内部的逻辑
50* builder.with_control_flag(running);
51* // 设置事件处理器
52* builder.with_event_handler(Rc::new(RefCell::new(NoOpEventHandler{})));
53*
54* let mut redis_listener = builder.build();
55* // 启动程序
56* redis_listener.start()?;
57* Ok(())
58* }
59* ```
60*
61* [Redis Replication协议]: https://redis.io/topics/replication
62* [`Event`]: enum.Event.html
63*/
64
65use std::io::{Read, Result};
66
67use crate::cmd::Command;
68use crate::rdb::{Module, Object};
69
70pub mod cmd;
71pub mod config;
72mod io;
73mod iter;
74pub mod listener;
75mod lzf;
76pub mod rdb;
77pub mod resp;
78mod tests;
79
80/// Redis事件监听器的定义,所有类型的监听器都实现此接口
81pub trait RedisListener {
82 /// 开启事件监听
83 fn start(&mut self) -> Result<()>;
84}
85
86/// Redis RDB 解析器定义
87pub trait RDBParser {
88 /// 解析RDB的具体实现
89 ///
90 /// 方法参数:
91 ///
92 /// * `input`: RDB输入流
93 /// * `length`: RDB的总长度
94 /// * `event_handler`: Redis事件处理器
95 fn parse(&mut self, input: &mut dyn Read, length: i64, event_handler: &mut dyn EventHandler) -> Result<()>;
96}
97
98/// Redis事件
99pub enum Event<'a> {
100 /// RDB事件
101 ///
102 /// 当开启`RedisListener`之后,Redis会将此刻内存中的数据dump出来(以rdb的格式进行dump),
103 /// dump完毕之后的rdb数据便会发送给`RedisListener`,此rdb中的数据即对应此事件
104 RDB(Object<'a>),
105 /// AOF事件
106 ///
107 /// 在上面rdb数据处理完毕之后,客户端对Redis的数据写入操作将会发送给`RedisListener`,
108 /// 此写入操作即对应此事件
109 AOF(Command<'a>),
110}
111
112/// Redis事件处理器的定义,所有类型的处理器都必须实现此接口
113pub trait EventHandler {
114 fn handle(&mut self, event: Event);
115}
116
117/// 对于接收到的Redis事件不做任何处理
118pub struct NoOpEventHandler {}
119
120impl EventHandler for NoOpEventHandler {
121 fn handle(&mut self, _: Event) {}
122}
123
124/// Module Parser
125pub trait ModuleParser {
126 /// 解析Module的具体实现
127 ///
128 /// 方法参数:
129 ///
130 /// * `input`: RDB输入流
131 /// * `module_name`: Module的名字
132 /// * `module_version`: Module的版本
133 fn parse(&mut self, input: &mut dyn Read, module_name: &str, module_version: usize) -> Box<dyn Module>;
134}
135
136/// 转换为utf-8字符串,不验证正确性
137fn to_string(bytes: Vec<u8>) -> String {
138 return unsafe { String::from_utf8_unchecked(bytes) };
139}