1use std::cell::RefCell;
7use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Result, Write};
8use std::net::TcpStream;
9use std::ops::DerefMut;
10use std::rc::Rc;
11use std::result::Result::Ok;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{mpsc, Arc};
14use std::thread;
15use std::thread::sleep;
16use std::time::{Duration, Instant};
17
18use log::{error, info, warn};
19use native_tls::{Identity, TlsConnector, TlsStream};
20
21use crate::config::Config;
22use crate::io::send;
23use crate::rdb::DefaultRDBParser;
24use crate::resp::{Resp, RespDecode, Type};
25use crate::{cmd, io, EventHandler, ModuleParser, NoOpEventHandler, RDBParser, RedisListener};
26use std::fs::File;
27
28pub struct Listener {
30 pub config: Config,
31 conn: Option<Stream>,
32 rdb_parser: Rc<RefCell<dyn RDBParser>>,
33 event_handler: Rc<RefCell<dyn EventHandler>>,
34 heartbeat_thread: HeartbeatWorker,
35 sender: Option<mpsc::Sender<Message>>,
36 running: Arc<AtomicBool>,
37 local_ip: Option<String>,
38 local_port: Option<u16>,
39}
40
41impl Listener {
42 fn connect(&mut self) -> Result<()> {
44 let addr = format!("{}:{}", &self.config.host, self.config.port);
45 let stream = TcpStream::connect(&addr)?;
46 stream
47 .set_read_timeout(self.config.read_timeout)
48 .expect("read timeout set failed");
49 stream
50 .set_write_timeout(self.config.write_timeout)
51 .expect("write timeout set failed");
52
53 let socket_addr = stream.local_addr().unwrap();
54 let local_ip = socket_addr.ip().to_string();
55 self.local_ip = Some(local_ip);
56
57 let local_port = socket_addr.port();
58 self.local_port = Some(local_port);
59
60 if self.config.is_tls_enabled {
61 let mut builder = TlsConnector::builder();
62 builder.danger_accept_invalid_hostnames(self.config.is_tls_insecure);
63 builder.danger_accept_invalid_certs(self.config.is_tls_insecure);
64
65 if let Some(id) = &self.config.identity {
66 let mut file = File::open(id)?;
67 let mut buff = Vec::new();
68 file.read_to_end(&mut buff)?;
69 let identity_passwd = match &self.config.identity_passwd {
70 None => "",
71 Some(passwd) => passwd.as_str(),
72 };
73 let identity = Identity::from_pkcs12(&buff, identity_passwd).expect("解析key失败");
74 builder.identity(identity);
75 }
76
77 let connector = builder.build().unwrap();
78 let tls_stream = connector
79 .connect(&self.config.host, stream)
80 .expect("TLS connect failed");
81 self.conn = Option::Some(Stream::Tls(tls_stream));
82 } else {
83 self.conn = Option::Some(Stream::Tcp(stream));
84 }
85 info!("Connected to server {}", &addr);
86 Ok(())
87 }
88
89 fn auth(&mut self) -> Result<()> {
91 if !self.config.password.is_empty() {
92 let mut args = Vec::with_capacity(2);
93 if !self.config.username.is_empty() {
94 args.push(self.config.username.as_bytes());
95 }
96 args.push(self.config.password.as_bytes());
97 let conn = self.conn.as_mut().unwrap();
98 let conn: &mut dyn Read = match conn {
99 Stream::Tcp(tcp_stream) => {
100 send(tcp_stream, b"AUTH", &args)?;
101 tcp_stream
102 }
103 Stream::Tls(tls_stream) => {
104 send(tls_stream, b"AUTH", &args)?;
105 tls_stream
106 }
107 };
108 conn.decode_resp()?;
109 }
110 Ok(())
111 }
112
113 fn send_replica_info(&mut self) -> Result<()> {
115 let port = self.local_port.unwrap().to_string();
116 let port = port.as_bytes();
117
118 let ip = self.local_ip.as_ref().unwrap();
119 let ip = ip.as_bytes();
120
121 let conn = self.conn.as_mut().unwrap();
122 match conn {
123 Stream::Tcp(tcp_stream) => Listener::de_send_replica_info(&port, &ip, tcp_stream)?,
124 Stream::Tls(tls_stream) => Listener::de_send_replica_info(&port, &ip, tls_stream)?,
125 };
126 Ok(())
127 }
128
129 fn de_send_replica_info<T: Write + Read>(port: &&[u8], ip: &&[u8], tcp_stream: &mut T) -> Result<()> {
130 info!("PING");
131 send(tcp_stream, b"PING", &vec![])?;
132 Listener::reply(tcp_stream)?;
133
134 info!("REPLCONF listening-port {}", String::from_utf8_lossy(*port));
135 send(tcp_stream, b"REPLCONF", &[b"listening-port", port])?;
136 Listener::reply(tcp_stream)?;
137
138 info!("REPLCONF ip-address {}", String::from_utf8_lossy(*ip));
139 send(tcp_stream, b"REPLCONF", &[b"ip-address", ip])?;
140 Listener::reply(tcp_stream)?;
141
142 info!("REPLCONF capa eof");
143 send(tcp_stream, b"REPLCONF", &[b"capa", b"eof"])?;
144 Listener::reply(tcp_stream)?;
145
146 info!("REPLCONF capa psync2");
147 send(tcp_stream, b"REPLCONF", &[b"capa", b"psync2"])?;
148 Listener::reply(tcp_stream)
149 }
150
151 fn reply<T: Read>(tcp_stream: &mut T) -> Result<()> {
152 match tcp_stream.decode_resp()? {
153 Resp::String(str) => info!("{}", str),
154 Resp::Error(err) => {
155 warn!("{}", &err);
156 if (err.contains("NOAUTH") || err.contains("NOPERM"))
157 && !err.contains("no password")
158 && !err.contains("Unrecognized REPLCONF option")
159 {
160 return Err(Error::new(ErrorKind::InvalidData, err));
161 }
162 }
163 _ => panic!("Unexpected response type"),
164 }
165 Ok(())
166 }
167
168 fn start_sync(&mut self) -> Result<Mode> {
171 let (next_step, mut length) = self.psync()?;
172 match next_step {
173 NextStep::FullSync | NextStep::ChangeMode => {
174 let mode;
175 if let NextStep::ChangeMode = next_step {
176 info!("源Redis不支持PSYNC命令, 使用SYNC命令再次进行尝试");
177 mode = Mode::Sync;
178 length = self.sync()?;
179 } else {
180 mode = Mode::PSync;
181 }
182 if length != -1 {
183 info!("Full Sync, size: {}bytes", length);
184 } else {
185 info!("Disk-less replication.");
186 }
187 let conn = self.conn.as_mut().unwrap();
188
189 let conn: &mut dyn Read = match conn {
190 Stream::Tcp(tcp_stream) => tcp_stream,
191 Stream::Tls(tls_stream) => tls_stream,
192 };
193 let mut reader = BufReader::new(conn);
194 reader.fill_buf()?;
195 if length != -1 && self.config.is_discard_rdb {
196 info!("跳过RDB不进行处理");
197 io::skip(&mut reader, length as isize)?;
198 } else {
199 let mut event_handler = self.event_handler.borrow_mut();
200 let mut rdb_parser = self.rdb_parser.borrow_mut();
201 rdb_parser.parse(&mut reader, length, event_handler.deref_mut())?;
202 if length == -1 {
203 io::skip(&mut reader, 40)?;
204 }
205 }
206 Ok(mode)
207 }
208 NextStep::PartialResync => {
209 info!("PSYNC进度恢复");
210 Ok(Mode::PSync)
211 }
212 NextStep::Wait => Ok(Mode::Wait),
213 }
214 }
215
216 fn psync(&mut self) -> Result<(NextStep, i64)> {
217 let offset = self.config.repl_offset.to_string();
218 let repl_offset = offset.as_bytes();
219 let repl_id = self.config.repl_id.as_bytes();
220
221 let conn = self.conn.as_mut().unwrap();
222 let conn: &mut dyn Read = match conn {
223 Stream::Tcp(tcp_stream) => {
224 send(tcp_stream, b"PSYNC", &[repl_id, repl_offset])?;
225
226 tcp_stream
227 }
228 Stream::Tls(tls_stream) => {
229 send(tls_stream, b"PSYNC", &[repl_id, repl_offset])?;
230 tls_stream
231 }
232 };
233
234 match conn.decode_resp() {
235 Ok(response) => {
236 if let Resp::String(resp) = &response {
237 info!("{}", resp);
238 if resp.starts_with("FULLRESYNC") {
239 let mut iter = resp.split_whitespace();
240 if let Some(repl_id) = iter.nth(1) {
241 self.config.repl_id = repl_id.to_owned();
242 } else {
243 panic!("Expect replication id, but got None");
244 }
245 if let Some(repl_offset) = iter.next() {
246 self.config.repl_offset = repl_offset.parse::<i64>().unwrap();
247 } else {
248 panic!("Expect replication offset, but got None");
249 }
250 info!("等待Redis dump完成...");
251 if let Type::BulkString = conn.decode_type()? {
252 let reply = conn.decode_string()?;
253 if reply.starts_with("EOF") {
254 return Ok((NextStep::FullSync, -1));
255 } else {
256 let length = reply.parse::<i64>().unwrap();
257 return Ok((NextStep::FullSync, length));
258 }
259 } else {
260 panic!("Expect BulkString response");
261 }
262 } else if resp.starts_with("CONTINUE") {
263 let mut iter = resp.split_whitespace();
264 if let Some(repl_id) = iter.nth(1) {
265 if !repl_id.eq(&self.config.repl_id) {
266 self.config.repl_id = repl_id.to_owned();
267 }
268 }
269 return Ok((NextStep::PartialResync, -1));
270 } else if resp.starts_with("NOMASTERLINK") {
271 return Ok((NextStep::Wait, -1));
272 } else if resp.starts_with("LOADING") {
273 return Ok((NextStep::Wait, -1));
274 }
275 }
276 panic!("Unexpected Response: {:?}", response);
277 }
278 Err(error) => {
279 if error.to_string().eq("ERR unknown command 'PSYNC'") {
280 return Ok((NextStep::ChangeMode, -1));
281 } else {
282 return Err(error);
283 }
284 }
285 }
286 }
287
288 fn sync(&mut self) -> Result<i64> {
289 let conn = self.conn.as_mut().unwrap();
290 let conn: &mut dyn Read = match conn {
291 Stream::Tcp(tcp_stream) => {
292 send(tcp_stream, b"SYNC", &vec![])?;
293 tcp_stream
294 }
295 Stream::Tls(tls_stream) => {
296 send(tls_stream, b"SYNC", &vec![])?;
297 tls_stream
298 }
299 };
300 if let Type::BulkString = conn.decode_type()? {
301 if let Resp::Int(length) = conn.decode_int()? {
302 return Ok(length);
303 } else {
304 panic!("Expect int response")
305 }
306 } else {
307 panic!("Expect BulkString response");
308 }
309 }
310
311 fn start_heartbeat(&mut self, mode: &Mode) {
313 if !self.is_running() {
314 return;
315 }
316 if let Mode::Sync = mode {
317 return;
318 }
319 if self.config.is_tls_enabled {
320 return;
321 }
322 let conn = self.conn.as_ref().unwrap();
323 let conn = match conn {
324 Stream::Tcp(tcp_stream) => tcp_stream,
325 Stream::Tls(_) => panic!("Expect TcpStream"),
326 };
327 let mut conn_clone = conn.try_clone().unwrap();
328
329 let (sender, receiver) = mpsc::channel();
330
331 let t = thread::spawn(move || {
332 let mut offset = 0;
333 let mut timer = Instant::now();
334 let one_sec = Duration::from_secs(1);
335 info!("heartbeat thread started");
336 loop {
337 match receiver.recv_timeout(one_sec) {
338 Ok(Message::Terminate) => break,
339 Ok(Message::Some(new_offset)) => {
340 offset = new_offset;
341 }
342 Err(_) => {}
343 };
344 let elapsed = timer.elapsed();
345 if elapsed.ge(&one_sec) {
346 let offset_str = offset.to_string();
347 let offset_bytes = offset_str.as_bytes();
348 if let Err(error) = send(&mut conn_clone, b"REPLCONF", &[b"ACK", offset_bytes]) {
349 error!("heartbeat error: {}", error);
350 break;
351 }
352 timer = Instant::now();
353 }
354 }
355 info!("heartbeat thread terminated");
356 });
357 self.heartbeat_thread = HeartbeatWorker { thread: Some(t) };
358 self.sender = Some(sender);
359 }
360
361 fn receive_aof(&mut self, mode: &Mode) -> Result<()> {
362 let mut handler = self.event_handler.as_ref().borrow_mut();
363
364 let __conn = self.conn.as_mut().unwrap();
365 match __conn {
366 Stream::Tcp(tcp_stream) => {
367 let mut reader = io::CountReader::new(tcp_stream);
368
369 while self.running.load(Ordering::Relaxed) {
370 reader.mark();
371 if let Resp::Array(array) = reader.decode_resp()? {
372 let size = reader.reset()?;
373 let mut vec = Vec::with_capacity(array.len());
374 for x in array {
375 if let Resp::BulkBytes(bytes) = x {
376 vec.push(bytes);
377 } else {
378 panic!("Expected BulkString response");
379 }
380 }
381 self.config.repl_offset += size;
382 if let Mode::PSync = mode {
383 if let Err(error) = self
384 .sender
385 .as_ref()
386 .unwrap()
387 .send(Message::Some(self.config.repl_offset))
388 {
389 error!("repl offset send error: {}", error);
390 }
391 }
392 cmd::parse(vec, handler.deref_mut());
393 } else {
394 panic!("Expected array response");
395 }
396 }
397 }
398 Stream::Tls(tls_stream) => {
399 let mut timer = Instant::now();
400 let one_sec = Duration::from_secs(1);
401
402 while self.running.load(Ordering::Relaxed) {
403 {
404 let mut reader = io::CountReader::new(tls_stream);
405 reader.mark();
406 if let Resp::Array(array) = reader.decode_resp()? {
407 let size = reader.reset()?;
408 let mut vec = Vec::with_capacity(array.len());
409 for x in array {
410 if let Resp::BulkBytes(bytes) = x {
411 vec.push(bytes);
412 } else {
413 panic!("Expected BulkString response");
414 }
415 }
416 self.config.repl_offset += size;
417
418 cmd::parse(vec, handler.deref_mut());
419 } else {
420 panic!("Expected array response");
421 }
422 }
423
424 let elapsed = timer.elapsed();
425 if elapsed.ge(&one_sec) {
426 let offset_str = self.config.repl_offset.to_string();
427 let offset_bytes = offset_str.as_bytes();
428 if let Err(error) = send(tls_stream, b"REPLCONF", &[b"ACK", offset_bytes]) {
429 error!("heartbeat error: {}", error);
430 break;
431 }
432 timer = Instant::now();
433 }
434 }
435 }
436 };
437 Ok(())
438 }
439
440 fn is_running(&self) -> bool {
442 self.running.load(Ordering::Relaxed)
443 }
444}
445
446impl RedisListener for Listener {
447 fn start(&mut self) -> Result<()> {
451 self.connect()?;
452 self.auth()?;
453 self.send_replica_info()?;
454 let mut mode;
455 loop {
456 mode = self.start_sync()?;
457 match mode {
458 Mode::Wait => {
459 if self.is_running() {
460 sleep(Duration::from_secs(5));
461 } else {
462 return Ok(());
463 }
464 }
465 _ => break,
466 }
467 }
468 if !self.config.is_aof {
469 Ok(())
470 } else {
471 self.start_heartbeat(&mode);
472 self.receive_aof(&mode)?;
473 Ok(())
474 }
475 }
476}
477
478impl Drop for Listener {
479 fn drop(&mut self) {
480 if let Some(sender) = self.sender.as_ref() {
481 if let Err(err) = sender.send(Message::Terminate) {
482 error!("Closing heartbeat thread error: {}", err)
483 }
484 }
485 if let Some(thread) = self.heartbeat_thread.thread.take() {
486 if let Err(_) = thread.join() {}
487 }
488 }
489}
490
491struct HeartbeatWorker {
492 thread: Option<thread::JoinHandle<()>>,
493}
494
495enum Message {
496 Terminate,
497 Some(i64),
498}
499
500enum NextStep {
501 FullSync,
502 PartialResync,
503 ChangeMode,
504 Wait,
505}
506
507enum Mode {
508 PSync,
509 Sync,
510 Wait,
511}
512
513pub struct Builder {
514 pub config: Option<Config>,
515 pub rdb_parser: Option<Rc<RefCell<dyn RDBParser>>>,
516 pub event_handler: Option<Rc<RefCell<dyn EventHandler>>>,
517 pub module_parser: Option<Rc<RefCell<dyn ModuleParser>>>,
518 pub control_flag: Option<Arc<AtomicBool>>,
519}
520
521impl Builder {
522 pub fn new() -> Builder {
523 Builder {
524 config: None,
525 rdb_parser: None,
526 event_handler: None,
527 module_parser: None,
528 control_flag: None,
529 }
530 }
531
532 pub fn with_config(&mut self, config: Config) {
533 self.config = Some(config);
534 }
535
536 pub fn with_rdb_parser(&mut self, parser: Rc<RefCell<dyn RDBParser>>) {
537 self.rdb_parser = Some(parser);
538 }
539
540 pub fn with_event_handler(&mut self, handler: Rc<RefCell<dyn EventHandler>>) {
541 self.event_handler = Some(handler);
542 }
543
544 pub fn with_module_parser(&mut self, parser: Rc<RefCell<dyn ModuleParser>>) {
545 self.module_parser = Some(parser);
546 }
547
548 pub fn with_control_flag(&mut self, flag: Arc<AtomicBool>) {
549 self.control_flag = Some(flag);
550 }
551
552 pub fn build(&mut self) -> Listener {
553 let config = match &self.config {
554 Some(c) => c,
555 None => panic!("Parameter Config is required"),
556 };
557
558 let module_parser = match &self.module_parser {
559 None => None,
560 Some(parser) => Some(parser.clone()),
561 };
562
563 let running = match &self.control_flag {
564 None => panic!("Parameter Control_flag is required"),
565 Some(flag) => flag.clone(),
566 };
567
568 let rdb_parser = match &self.rdb_parser {
569 None => Rc::new(RefCell::new(DefaultRDBParser {
570 running: Arc::clone(&running),
571 module_parser,
572 })),
573 Some(parser) => parser.clone(),
574 };
575
576 let event_handler = match &self.event_handler {
577 None => Rc::new(RefCell::new(NoOpEventHandler {})),
578 Some(handler) => handler.clone(),
579 };
580
581 Listener {
582 config: config.clone(),
583 conn: None,
584 rdb_parser,
585 event_handler,
586 heartbeat_thread: HeartbeatWorker { thread: None },
587 sender: None,
588 running,
589 local_ip: None,
590 local_port: None,
591 }
592 }
593}
594
595enum Stream {
596 Tcp(TcpStream),
597 Tls(TlsStream<TcpStream>),
598}