redis_event/
lib.rs

1/*!
2* 用于监听Redis的写入操作,据此可以实现数据复制,监控等相关的应用。
3*
4* # 原理
5*
6* 此crate实现了[Redis Replication协议],在运行时,程序将以replica的身份连接到Redis,相当于Redis的一个副本。
7*
8* 所以,在程序连接上某个Redis之后,Redis会将它当前的所有数据以RDB的格式dump一份,dump完毕之后便发送过来,这个RDB中的每一条数据就对应一个[`Event`]`::RDB`事件。
9*
10* 在这之后,Redis接收到来自客户端的写入操作(即Redis命令)后,也会将这个写入操作传播给它的replica,每一个写入操作就对应一个[`Event`]`::AOF`事件。
11*
12* # 示例
13*
14* ```no_run
15* use std::net::{IpAddr, SocketAddr};
16* use std::sync::atomic::AtomicBool;
17* use std::sync::Arc;
18* use std::str::FromStr;
19* use std::rc::Rc;
20* use std::cell::RefCell;
21* use redis_event::listener;
22* use redis_event::config::Config;
23* use redis_event::{NoOpEventHandler, RedisListener};
24*
25* fn main() -> std::io::Result<()> {
26*     let host = String::from("127.0.0.1");
27*     let port = 6379;
28*
29*     let conf = Config {
30*         is_discard_rdb: false,            // 不跳过RDB
31*         is_aof: false,                    // 不处理AOF
32*         host,
33*         port,
34*         username: String::new(),          // 用户名为空
35*         password: String::new(),          // 密码为空
36*         repl_id: String::from("?"),       // replication id,若无此id,设置为?即可
37*         repl_offset: -1,                  // replication offset,若无此offset,设置为-1即可
38*         read_timeout: None,               // None,即读取永不超时
39*         write_timeout: None,              // None,即写入永不超时
40*         is_tls_enabled: false,            // 不启用TLS
41*         is_tls_insecure: false,           // 未启用TLS,设置为false即可
42*         identity: None,                   // 未启用TLS,设置为None即可
43*         identity_passwd: None             // 未启用TLS,设置为None即可
44*     };
45*     let running = Arc::new(AtomicBool::new(true));
46*
47*     let mut builder = listener::Builder::new();
48*     builder.with_config(conf);
49*     // 设置控制变量, 通过此变量在外界中断`redis_event`内部的逻辑
50*     builder.with_control_flag(running);
51*     // 设置事件处理器
52*     builder.with_event_handler(Rc::new(RefCell::new(NoOpEventHandler{})));
53*
54*     let mut redis_listener = builder.build();
55*     // 启动程序
56*     redis_listener.start()?;
57*     Ok(())
58* }
59* ```
60*
61* [Redis Replication协议]: https://redis.io/topics/replication
62* [`Event`]: enum.Event.html
63*/
64
65use std::io::{Read, Result};
66
67use crate::cmd::Command;
68use crate::rdb::{Module, Object};
69
70pub mod cmd;
71pub mod config;
72mod io;
73mod iter;
74pub mod listener;
75mod lzf;
76pub mod rdb;
77pub mod resp;
78mod tests;
79
80/// Redis事件监听器的定义,所有类型的监听器都实现此接口
81pub trait RedisListener {
82    /// 开启事件监听
83    fn start(&mut self) -> Result<()>;
84}
85
86/// Redis RDB 解析器定义
87pub trait RDBParser {
88    /// 解析RDB的具体实现
89    ///
90    /// 方法参数:
91    ///
92    /// * `input`: RDB输入流
93    /// * `length`: RDB的总长度
94    /// * `event_handler`: Redis事件处理器
95    fn parse(&mut self, input: &mut dyn Read, length: i64, event_handler: &mut dyn EventHandler) -> Result<()>;
96}
97
98/// Redis事件
99pub enum Event<'a> {
100    /// RDB事件
101    ///
102    /// 当开启`RedisListener`之后,Redis会将此刻内存中的数据dump出来(以rdb的格式进行dump),
103    /// dump完毕之后的rdb数据便会发送给`RedisListener`,此rdb中的数据即对应此事件
104    RDB(Object<'a>),
105    /// AOF事件
106    ///
107    /// 在上面rdb数据处理完毕之后,客户端对Redis的数据写入操作将会发送给`RedisListener`,
108    /// 此写入操作即对应此事件
109    AOF(Command<'a>),
110}
111
112/// Redis事件处理器的定义,所有类型的处理器都必须实现此接口
113pub trait EventHandler {
114    fn handle(&mut self, event: Event);
115}
116
117/// 对于接收到的Redis事件不做任何处理
118pub struct NoOpEventHandler {}
119
120impl EventHandler for NoOpEventHandler {
121    fn handle(&mut self, _: Event) {}
122}
123
124/// Module Parser
125pub trait ModuleParser {
126    /// 解析Module的具体实现
127    ///
128    /// 方法参数:
129    ///
130    /// * `input`: RDB输入流
131    /// * `module_name`: Module的名字
132    /// * `module_version`: Module的版本
133    fn parse(&mut self, input: &mut dyn Read, module_name: &str, module_version: usize) -> Box<dyn Module>;
134}
135
136/// 转换为utf-8字符串,不验证正确性
137fn to_string(bytes: Vec<u8>) -> String {
138    return unsafe { String::from_utf8_unchecked(bytes) };
139}