redis_event/
listener.rs

1/*!
2[`RedisListener`]接口的具体实现
3
4[`RedisListener`]: trait.RedisListener.html
5*/
6use std::cell::RefCell;
7use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Result, Write};
8use std::net::TcpStream;
9use std::ops::DerefMut;
10use std::rc::Rc;
11use std::result::Result::Ok;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{mpsc, Arc};
14use std::thread;
15use std::thread::sleep;
16use std::time::{Duration, Instant};
17
18use log::{error, info, warn};
19use native_tls::{Identity, TlsConnector, TlsStream};
20
21use crate::config::Config;
22use crate::io::send;
23use crate::rdb::DefaultRDBParser;
24use crate::resp::{Resp, RespDecode, Type};
25use crate::{cmd, io, EventHandler, ModuleParser, NoOpEventHandler, RDBParser, RedisListener};
26use std::fs::File;
27
28/// 用于监听单个Redis实例的事件
29pub struct Listener {
30    pub config: Config,
31    conn: Option<Stream>,
32    rdb_parser: Rc<RefCell<dyn RDBParser>>,
33    event_handler: Rc<RefCell<dyn EventHandler>>,
34    heartbeat_thread: HeartbeatWorker,
35    sender: Option<mpsc::Sender<Message>>,
36    running: Arc<AtomicBool>,
37    local_ip: Option<String>,
38    local_port: Option<u16>,
39}
40
41impl Listener {
42    /// 连接Redis,创建TCP连接
43    fn connect(&mut self) -> Result<()> {
44        let addr = format!("{}:{}", &self.config.host, self.config.port);
45        let stream = TcpStream::connect(&addr)?;
46        stream
47            .set_read_timeout(self.config.read_timeout)
48            .expect("read timeout set failed");
49        stream
50            .set_write_timeout(self.config.write_timeout)
51            .expect("write timeout set failed");
52
53        let socket_addr = stream.local_addr().unwrap();
54        let local_ip = socket_addr.ip().to_string();
55        self.local_ip = Some(local_ip);
56
57        let local_port = socket_addr.port();
58        self.local_port = Some(local_port);
59
60        if self.config.is_tls_enabled {
61            let mut builder = TlsConnector::builder();
62            builder.danger_accept_invalid_hostnames(self.config.is_tls_insecure);
63            builder.danger_accept_invalid_certs(self.config.is_tls_insecure);
64
65            if let Some(id) = &self.config.identity {
66                let mut file = File::open(id)?;
67                let mut buff = Vec::new();
68                file.read_to_end(&mut buff)?;
69                let identity_passwd = match &self.config.identity_passwd {
70                    None => "",
71                    Some(passwd) => passwd.as_str(),
72                };
73                let identity = Identity::from_pkcs12(&buff, identity_passwd).expect("解析key失败");
74                builder.identity(identity);
75            }
76
77            let connector = builder.build().unwrap();
78            let tls_stream = connector
79                .connect(&self.config.host, stream)
80                .expect("TLS connect failed");
81            self.conn = Option::Some(Stream::Tls(tls_stream));
82        } else {
83            self.conn = Option::Some(Stream::Tcp(stream));
84        }
85        info!("Connected to server {}", &addr);
86        Ok(())
87    }
88
89    /// 如果有设置密码,将尝试使用此密码进行认证
90    fn auth(&mut self) -> Result<()> {
91        if !self.config.password.is_empty() {
92            let mut args = Vec::with_capacity(2);
93            if !self.config.username.is_empty() {
94                args.push(self.config.username.as_bytes());
95            }
96            args.push(self.config.password.as_bytes());
97            let conn = self.conn.as_mut().unwrap();
98            let conn: &mut dyn Read = match conn {
99                Stream::Tcp(tcp_stream) => {
100                    send(tcp_stream, b"AUTH", &args)?;
101                    tcp_stream
102                }
103                Stream::Tls(tls_stream) => {
104                    send(tls_stream, b"AUTH", &args)?;
105                    tls_stream
106                }
107            };
108            conn.decode_resp()?;
109        }
110        Ok(())
111    }
112
113    /// 发送replica相关信息到redis,此端口展现在`info replication`中
114    fn send_replica_info(&mut self) -> Result<()> {
115        let port = self.local_port.unwrap().to_string();
116        let port = port.as_bytes();
117
118        let ip = self.local_ip.as_ref().unwrap();
119        let ip = ip.as_bytes();
120
121        let conn = self.conn.as_mut().unwrap();
122        match conn {
123            Stream::Tcp(tcp_stream) => Listener::de_send_replica_info(&port, &ip, tcp_stream)?,
124            Stream::Tls(tls_stream) => Listener::de_send_replica_info(&port, &ip, tls_stream)?,
125        };
126        Ok(())
127    }
128
129    fn de_send_replica_info<T: Write + Read>(port: &&[u8], ip: &&[u8], tcp_stream: &mut T) -> Result<()> {
130        info!("PING");
131        send(tcp_stream, b"PING", &vec![])?;
132        Listener::reply(tcp_stream)?;
133
134        info!("REPLCONF listening-port {}", String::from_utf8_lossy(*port));
135        send(tcp_stream, b"REPLCONF", &[b"listening-port", port])?;
136        Listener::reply(tcp_stream)?;
137
138        info!("REPLCONF ip-address {}", String::from_utf8_lossy(*ip));
139        send(tcp_stream, b"REPLCONF", &[b"ip-address", ip])?;
140        Listener::reply(tcp_stream)?;
141
142        info!("REPLCONF capa eof");
143        send(tcp_stream, b"REPLCONF", &[b"capa", b"eof"])?;
144        Listener::reply(tcp_stream)?;
145
146        info!("REPLCONF capa psync2");
147        send(tcp_stream, b"REPLCONF", &[b"capa", b"psync2"])?;
148        Listener::reply(tcp_stream)
149    }
150
151    fn reply<T: Read>(tcp_stream: &mut T) -> Result<()> {
152        match tcp_stream.decode_resp()? {
153            Resp::String(str) => info!("{}", str),
154            Resp::Error(err) => {
155                warn!("{}", &err);
156                if (err.contains("NOAUTH") || err.contains("NOPERM"))
157                    && !err.contains("no password")
158                    && !err.contains("Unrecognized REPLCONF option")
159                {
160                    return Err(Error::new(ErrorKind::InvalidData, err));
161                }
162            }
163            _ => panic!("Unexpected response type"),
164        }
165        Ok(())
166    }
167
168    /// 开启replication
169    /// 默认使用PSYNC命令,若不支持PSYNC则尝试使用SYNC命令
170    fn start_sync(&mut self) -> Result<Mode> {
171        let (next_step, mut length) = self.psync()?;
172        match next_step {
173            NextStep::FullSync | NextStep::ChangeMode => {
174                let mode;
175                if let NextStep::ChangeMode = next_step {
176                    info!("源Redis不支持PSYNC命令, 使用SYNC命令再次进行尝试");
177                    mode = Mode::Sync;
178                    length = self.sync()?;
179                } else {
180                    mode = Mode::PSync;
181                }
182                if length != -1 {
183                    info!("Full Sync, size: {}bytes", length);
184                } else {
185                    info!("Disk-less replication.");
186                }
187                let conn = self.conn.as_mut().unwrap();
188
189                let conn: &mut dyn Read = match conn {
190                    Stream::Tcp(tcp_stream) => tcp_stream,
191                    Stream::Tls(tls_stream) => tls_stream,
192                };
193                let mut reader = BufReader::new(conn);
194                reader.fill_buf()?;
195                if length != -1 && self.config.is_discard_rdb {
196                    info!("跳过RDB不进行处理");
197                    io::skip(&mut reader, length as isize)?;
198                } else {
199                    let mut event_handler = self.event_handler.borrow_mut();
200                    let mut rdb_parser = self.rdb_parser.borrow_mut();
201                    rdb_parser.parse(&mut reader, length, event_handler.deref_mut())?;
202                    if length == -1 {
203                        io::skip(&mut reader, 40)?;
204                    }
205                }
206                Ok(mode)
207            }
208            NextStep::PartialResync => {
209                info!("PSYNC进度恢复");
210                Ok(Mode::PSync)
211            }
212            NextStep::Wait => Ok(Mode::Wait),
213        }
214    }
215
216    fn psync(&mut self) -> Result<(NextStep, i64)> {
217        let offset = self.config.repl_offset.to_string();
218        let repl_offset = offset.as_bytes();
219        let repl_id = self.config.repl_id.as_bytes();
220
221        let conn = self.conn.as_mut().unwrap();
222        let conn: &mut dyn Read = match conn {
223            Stream::Tcp(tcp_stream) => {
224                send(tcp_stream, b"PSYNC", &[repl_id, repl_offset])?;
225
226                tcp_stream
227            }
228            Stream::Tls(tls_stream) => {
229                send(tls_stream, b"PSYNC", &[repl_id, repl_offset])?;
230                tls_stream
231            }
232        };
233
234        match conn.decode_resp() {
235            Ok(response) => {
236                if let Resp::String(resp) = &response {
237                    info!("{}", resp);
238                    if resp.starts_with("FULLRESYNC") {
239                        let mut iter = resp.split_whitespace();
240                        if let Some(repl_id) = iter.nth(1) {
241                            self.config.repl_id = repl_id.to_owned();
242                        } else {
243                            panic!("Expect replication id, but got None");
244                        }
245                        if let Some(repl_offset) = iter.next() {
246                            self.config.repl_offset = repl_offset.parse::<i64>().unwrap();
247                        } else {
248                            panic!("Expect replication offset, but got None");
249                        }
250                        info!("等待Redis dump完成...");
251                        if let Type::BulkString = conn.decode_type()? {
252                            let reply = conn.decode_string()?;
253                            if reply.starts_with("EOF") {
254                                return Ok((NextStep::FullSync, -1));
255                            } else {
256                                let length = reply.parse::<i64>().unwrap();
257                                return Ok((NextStep::FullSync, length));
258                            }
259                        } else {
260                            panic!("Expect BulkString response");
261                        }
262                    } else if resp.starts_with("CONTINUE") {
263                        let mut iter = resp.split_whitespace();
264                        if let Some(repl_id) = iter.nth(1) {
265                            if !repl_id.eq(&self.config.repl_id) {
266                                self.config.repl_id = repl_id.to_owned();
267                            }
268                        }
269                        return Ok((NextStep::PartialResync, -1));
270                    } else if resp.starts_with("NOMASTERLINK") {
271                        return Ok((NextStep::Wait, -1));
272                    } else if resp.starts_with("LOADING") {
273                        return Ok((NextStep::Wait, -1));
274                    }
275                }
276                panic!("Unexpected Response: {:?}", response);
277            }
278            Err(error) => {
279                if error.to_string().eq("ERR unknown command 'PSYNC'") {
280                    return Ok((NextStep::ChangeMode, -1));
281                } else {
282                    return Err(error);
283                }
284            }
285        }
286    }
287
288    fn sync(&mut self) -> Result<i64> {
289        let conn = self.conn.as_mut().unwrap();
290        let conn: &mut dyn Read = match conn {
291            Stream::Tcp(tcp_stream) => {
292                send(tcp_stream, b"SYNC", &vec![])?;
293                tcp_stream
294            }
295            Stream::Tls(tls_stream) => {
296                send(tls_stream, b"SYNC", &vec![])?;
297                tls_stream
298            }
299        };
300        if let Type::BulkString = conn.decode_type()? {
301            if let Resp::Int(length) = conn.decode_int()? {
302                return Ok(length);
303            } else {
304                panic!("Expect int response")
305            }
306        } else {
307            panic!("Expect BulkString response");
308        }
309    }
310
311    /// 开启心跳
312    fn start_heartbeat(&mut self, mode: &Mode) {
313        if !self.is_running() {
314            return;
315        }
316        if let Mode::Sync = mode {
317            return;
318        }
319        if self.config.is_tls_enabled {
320            return;
321        }
322        let conn = self.conn.as_ref().unwrap();
323        let conn = match conn {
324            Stream::Tcp(tcp_stream) => tcp_stream,
325            Stream::Tls(_) => panic!("Expect TcpStream"),
326        };
327        let mut conn_clone = conn.try_clone().unwrap();
328
329        let (sender, receiver) = mpsc::channel();
330
331        let t = thread::spawn(move || {
332            let mut offset = 0;
333            let mut timer = Instant::now();
334            let one_sec = Duration::from_secs(1);
335            info!("heartbeat thread started");
336            loop {
337                match receiver.recv_timeout(one_sec) {
338                    Ok(Message::Terminate) => break,
339                    Ok(Message::Some(new_offset)) => {
340                        offset = new_offset;
341                    }
342                    Err(_) => {}
343                };
344                let elapsed = timer.elapsed();
345                if elapsed.ge(&one_sec) {
346                    let offset_str = offset.to_string();
347                    let offset_bytes = offset_str.as_bytes();
348                    if let Err(error) = send(&mut conn_clone, b"REPLCONF", &[b"ACK", offset_bytes]) {
349                        error!("heartbeat error: {}", error);
350                        break;
351                    }
352                    timer = Instant::now();
353                }
354            }
355            info!("heartbeat thread terminated");
356        });
357        self.heartbeat_thread = HeartbeatWorker { thread: Some(t) };
358        self.sender = Some(sender);
359    }
360
361    fn receive_aof(&mut self, mode: &Mode) -> Result<()> {
362        let mut handler = self.event_handler.as_ref().borrow_mut();
363
364        let __conn = self.conn.as_mut().unwrap();
365        match __conn {
366            Stream::Tcp(tcp_stream) => {
367                let mut reader = io::CountReader::new(tcp_stream);
368
369                while self.running.load(Ordering::Relaxed) {
370                    reader.mark();
371                    if let Resp::Array(array) = reader.decode_resp()? {
372                        let size = reader.reset()?;
373                        let mut vec = Vec::with_capacity(array.len());
374                        for x in array {
375                            if let Resp::BulkBytes(bytes) = x {
376                                vec.push(bytes);
377                            } else {
378                                panic!("Expected BulkString response");
379                            }
380                        }
381                        self.config.repl_offset += size;
382                        if let Mode::PSync = mode {
383                            if let Err(error) = self
384                                .sender
385                                .as_ref()
386                                .unwrap()
387                                .send(Message::Some(self.config.repl_offset))
388                            {
389                                error!("repl offset send error: {}", error);
390                            }
391                        }
392                        cmd::parse(vec, handler.deref_mut());
393                    } else {
394                        panic!("Expected array response");
395                    }
396                }
397            }
398            Stream::Tls(tls_stream) => {
399                let mut timer = Instant::now();
400                let one_sec = Duration::from_secs(1);
401
402                while self.running.load(Ordering::Relaxed) {
403                    {
404                        let mut reader = io::CountReader::new(tls_stream);
405                        reader.mark();
406                        if let Resp::Array(array) = reader.decode_resp()? {
407                            let size = reader.reset()?;
408                            let mut vec = Vec::with_capacity(array.len());
409                            for x in array {
410                                if let Resp::BulkBytes(bytes) = x {
411                                    vec.push(bytes);
412                                } else {
413                                    panic!("Expected BulkString response");
414                                }
415                            }
416                            self.config.repl_offset += size;
417
418                            cmd::parse(vec, handler.deref_mut());
419                        } else {
420                            panic!("Expected array response");
421                        }
422                    }
423
424                    let elapsed = timer.elapsed();
425                    if elapsed.ge(&one_sec) {
426                        let offset_str = self.config.repl_offset.to_string();
427                        let offset_bytes = offset_str.as_bytes();
428                        if let Err(error) = send(tls_stream, b"REPLCONF", &[b"ACK", offset_bytes]) {
429                            error!("heartbeat error: {}", error);
430                            break;
431                        }
432                        timer = Instant::now();
433                    }
434                }
435            }
436        };
437        Ok(())
438    }
439
440    /// 获取当前运行的状态,若为false,程序将有序退出
441    fn is_running(&self) -> bool {
442        self.running.load(Ordering::Relaxed)
443    }
444}
445
446impl RedisListener for Listener {
447    /// 程序运行的整体逻辑都在这个方法里面实现
448    ///
449    /// 具体的细节体现在各个方法内
450    fn start(&mut self) -> Result<()> {
451        self.connect()?;
452        self.auth()?;
453        self.send_replica_info()?;
454        let mut mode;
455        loop {
456            mode = self.start_sync()?;
457            match mode {
458                Mode::Wait => {
459                    if self.is_running() {
460                        sleep(Duration::from_secs(5));
461                    } else {
462                        return Ok(());
463                    }
464                }
465                _ => break,
466            }
467        }
468        if !self.config.is_aof {
469            Ok(())
470        } else {
471            self.start_heartbeat(&mode);
472            self.receive_aof(&mode)?;
473            Ok(())
474        }
475    }
476}
477
478impl Drop for Listener {
479    fn drop(&mut self) {
480        if let Some(sender) = self.sender.as_ref() {
481            if let Err(err) = sender.send(Message::Terminate) {
482                error!("Closing heartbeat thread error: {}", err)
483            }
484        }
485        if let Some(thread) = self.heartbeat_thread.thread.take() {
486            if let Err(_) = thread.join() {}
487        }
488    }
489}
490
491struct HeartbeatWorker {
492    thread: Option<thread::JoinHandle<()>>,
493}
494
495enum Message {
496    Terminate,
497    Some(i64),
498}
499
500enum NextStep {
501    FullSync,
502    PartialResync,
503    ChangeMode,
504    Wait,
505}
506
507enum Mode {
508    PSync,
509    Sync,
510    Wait,
511}
512
513pub struct Builder {
514    pub config: Option<Config>,
515    pub rdb_parser: Option<Rc<RefCell<dyn RDBParser>>>,
516    pub event_handler: Option<Rc<RefCell<dyn EventHandler>>>,
517    pub module_parser: Option<Rc<RefCell<dyn ModuleParser>>>,
518    pub control_flag: Option<Arc<AtomicBool>>,
519}
520
521impl Builder {
522    pub fn new() -> Builder {
523        Builder {
524            config: None,
525            rdb_parser: None,
526            event_handler: None,
527            module_parser: None,
528            control_flag: None,
529        }
530    }
531
532    pub fn with_config(&mut self, config: Config) {
533        self.config = Some(config);
534    }
535
536    pub fn with_rdb_parser(&mut self, parser: Rc<RefCell<dyn RDBParser>>) {
537        self.rdb_parser = Some(parser);
538    }
539
540    pub fn with_event_handler(&mut self, handler: Rc<RefCell<dyn EventHandler>>) {
541        self.event_handler = Some(handler);
542    }
543
544    pub fn with_module_parser(&mut self, parser: Rc<RefCell<dyn ModuleParser>>) {
545        self.module_parser = Some(parser);
546    }
547
548    pub fn with_control_flag(&mut self, flag: Arc<AtomicBool>) {
549        self.control_flag = Some(flag);
550    }
551
552    pub fn build(&mut self) -> Listener {
553        let config = match &self.config {
554            Some(c) => c,
555            None => panic!("Parameter Config is required"),
556        };
557
558        let module_parser = match &self.module_parser {
559            None => None,
560            Some(parser) => Some(parser.clone()),
561        };
562
563        let running = match &self.control_flag {
564            None => panic!("Parameter Control_flag is required"),
565            Some(flag) => flag.clone(),
566        };
567
568        let rdb_parser = match &self.rdb_parser {
569            None => Rc::new(RefCell::new(DefaultRDBParser {
570                running: Arc::clone(&running),
571                module_parser,
572            })),
573            Some(parser) => parser.clone(),
574        };
575
576        let event_handler = match &self.event_handler {
577            None => Rc::new(RefCell::new(NoOpEventHandler {})),
578            Some(handler) => handler.clone(),
579        };
580
581        Listener {
582            config: config.clone(),
583            conn: None,
584            rdb_parser,
585            event_handler,
586            heartbeat_thread: HeartbeatWorker { thread: None },
587            sender: None,
588            running,
589            local_ip: None,
590            local_port: None,
591        }
592    }
593}
594
595enum Stream {
596    Tcp(TcpStream),
597    Tls(TlsStream<TcpStream>),
598}