1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
/*! * 用于监听Redis的写入操作,据此可以实现数据复制,监控等相关的应用。 * * # 原理 * * 此crate实现了[Redis Replication协议],在运行时,程序将以replica的身份连接到Redis,相当于Redis的一个副本。 * * 所以,在程序连接上某个Redis之后,Redis会将它当前的所有数据以RDB的格式dump一份,dump完毕之后便发送过来,这个RDB中的每一条数据就对应一个[`Event`]`::RDB`事件。 * * 在这之后,Redis接收到来自客户端的写入操作(即Redis命令)后,也会将这个写入操作传播给它的replica,每一个写入操作就对应一个[`Event`]`::AOF`事件。 * * # 示例 * * ```no_run * use std::net::{IpAddr, SocketAddr}; * use std::sync::atomic::AtomicBool; * use std::sync::Arc; * use std::str::FromStr; * use std::rc::Rc; * use std::cell::RefCell; * use redis_event::listener; * use redis_event::config::Config; * use redis_event::{NoOpEventHandler, RedisListener}; * * fn main() -> std::io::Result<()> { * let host = String::from("127.0.0.1"); * let port = 6379; * * let conf = Config { * is_discard_rdb: false, // 不跳过RDB * is_aof: false, // 不处理AOF * host, * port, * username: String::new(), // 用户名为空 * password: String::new(), // 密码为空 * repl_id: String::from("?"), // replication id,若无此id,设置为?即可 * repl_offset: -1, // replication offset,若无此offset,设置为-1即可 * read_timeout: None, // None,即读取永不超时 * write_timeout: None, // None,即写入永不超时 * is_tls_enabled: false, // 不启用TLS * is_tls_insecure: false, // 未启用TLS,设置为false即可 * identity: None, // 未启用TLS,设置为None即可 * identity_passwd: None // 未启用TLS,设置为None即可 * }; * let running = Arc::new(AtomicBool::new(true)); * * let mut builder = listener::Builder::new(); * builder.with_config(conf); * // 设置控制变量, 通过此变量在外界中断`redis_event`内部的逻辑 * builder.with_control_flag(running); * // 设置事件处理器 * builder.with_event_handler(Rc::new(RefCell::new(NoOpEventHandler{}))); * * let mut redis_listener = builder.build(); * // 启动程序 * redis_listener.start()?; * Ok(()) * } * ``` * * [Redis Replication协议]: https://redis.io/topics/replication * [`Event`]: enum.Event.html */ use std::io::{Read, Result}; use crate::cmd::Command; use crate::rdb::{Module, Object}; pub mod cmd; pub mod config; mod io; mod iter; pub mod listener; mod lzf; pub mod rdb; pub mod resp; mod tests; /// Redis事件监听器的定义,所有类型的监听器都实现此接口 pub trait RedisListener { /// 开启事件监听 fn start(&mut self) -> Result<()>; } /// Redis RDB 解析器定义 pub trait RDBParser { /// 解析RDB的具体实现 /// /// 方法参数: /// /// * `input`: RDB输入流 /// * `length`: RDB的总长度 /// * `event_handler`: Redis事件处理器 fn parse( &mut self, input: &mut dyn Read, length: i64, event_handler: &mut dyn EventHandler, ) -> Result<()>; } /// Redis事件 pub enum Event<'a> { /// RDB事件 /// /// 当开启`RedisListener`之后,Redis会将此刻内存中的数据dump出来(以rdb的格式进行dump), /// dump完毕之后的rdb数据便会发送给`RedisListener`,此rdb中的数据即对应此事件 RDB(Object<'a>), /// AOF事件 /// /// 在上面rdb数据处理完毕之后,客户端对Redis的数据写入操作将会发送给`RedisListener`, /// 此写入操作即对应此事件 AOF(Command<'a>), } /// Redis事件处理器的定义,所有类型的处理器都必须实现此接口 pub trait EventHandler { fn handle(&mut self, event: Event); } /// 对于接收到的Redis事件不做任何处理 pub struct NoOpEventHandler {} impl EventHandler for NoOpEventHandler { fn handle(&mut self, _: Event) {} } /// Module Parser pub trait ModuleParser { /// 解析Module的具体实现 /// /// 方法参数: /// /// * `input`: RDB输入流 /// * `module_name`: Module的名字 /// * `module_version`: Module的版本 fn parse( &mut self, input: &mut dyn Read, module_name: &str, module_version: usize, ) -> Box<dyn Module>; } /// 转换为utf-8字符串,不验证正确性 fn to_string(bytes: Vec<u8>) -> String { return unsafe { String::from_utf8_unchecked(bytes) }; }