1#[macro_use]
49extern crate log;
50extern crate crc16;
51extern crate rand;
52
53pub extern crate redis;
54
55use std::cell::RefCell;
56use std::collections::{HashMap, HashSet};
57use std::io::{BufRead, Cursor};
58use std::iter::Iterator;
59use std::thread;
60use std::time::Duration;
61
62use crc16::*;
63use rand::{
64 seq::{IteratorRandom, SliceRandom},
65 thread_rng,
66};
67use redis::{
68 cmd, Cmd, ConnectionAddr, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, Value,
69};
70
71pub use redis::{pipe, Commands, ConnectionLike, PipelineCommands, RedisResult};
72
73const SLOT_SIZE: usize = 16384;
74
75pub struct Builder<T: IntoConnectionInfo> {
77 initial_nodes: Vec<T>,
78 readonly: bool,
79 password: Option<String>,
80 read_timeout: Option<Duration>,
81 write_timeout: Option<Duration>,
82}
83
84impl<T: IntoConnectionInfo> Builder<T> {
85 pub fn new(initial_nodes: Vec<T>) -> Builder<T> {
87 Builder {
88 initial_nodes: initial_nodes,
89 readonly: false,
90 password: None,
91 read_timeout: None,
92 write_timeout: None,
93 }
94 }
95
96 pub fn open(self) -> RedisResult<Client> {
104 Client::open_internal(self)
105 }
106
107 pub fn password(mut self, password: String) -> Builder<T> {
109 self.password = Some(password);
110 return self;
111 }
112
113 pub fn readonly(mut self, readonly: bool) -> Builder<T> {
118 self.readonly = readonly;
119 return self;
120 }
121
122 pub fn read_timeout(mut self, timeout: Option<Duration>) -> Builder<T> {
128 self.read_timeout = timeout;
129 return self;
130 }
131
132 pub fn write_timeout(mut self, timeout: Option<Duration>) -> Builder<T> {
138 self.write_timeout = timeout;
139 return self;
140 }
141}
142
143pub struct Client {
145 initial_nodes: Vec<ConnectionInfo>,
146 readonly: bool,
147 password: Option<String>,
148 read_timeout: Option<Duration>,
149 write_timeout: Option<Duration>,
150}
151
152impl Client {
153 pub fn open<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<Client> {
161 Builder::new(initial_nodes).open()
162 }
163
164 #[deprecated(note = "Please use Builder instead")]
174 pub fn open_with_auth<T: IntoConnectionInfo>(
175 initial_nodes: Vec<T>,
176 password: String,
177 ) -> RedisResult<Client> {
178 Builder::new(initial_nodes).password(password).open()
179 }
180
181 pub fn get_connection(&self) -> RedisResult<Connection> {
187 Connection::new(
188 self.initial_nodes.clone(),
189 self.readonly,
190 self.password.clone(),
191 self.read_timeout.clone(),
192 self.write_timeout.clone(),
193 )
194 }
195
196 fn open_internal<T: IntoConnectionInfo>(builder: Builder<T>) -> RedisResult<Client> {
197 let mut nodes = Vec::with_capacity(builder.initial_nodes.len());
198 let mut connection_info_password = None::<String>;
199
200 for (index, info) in builder.initial_nodes.into_iter().enumerate() {
201 let info = info.into_connection_info()?;
202 if let ConnectionAddr::Unix(_) = *info.addr {
203 return Err(RedisError::from((ErrorKind::InvalidClientConfig,
204 "This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
205 }
206
207 if builder.password.is_none() {
208 if index == 0 {
209 connection_info_password = info.passwd.clone();
210 } else if connection_info_password != info.passwd {
211 return Err(RedisError::from((
212 ErrorKind::InvalidClientConfig,
213 "Cannot use different password among initial nodes.",
214 )));
215 }
216 }
217
218 nodes.push(info);
219 }
220
221 Ok(Client {
222 initial_nodes: nodes,
223 readonly: builder.readonly,
224 password: builder.password.or(connection_info_password),
225 read_timeout: builder.read_timeout,
226 write_timeout: builder.write_timeout,
227 })
228 }
229}
230
231pub struct Connection {
233 initial_nodes: Vec<ConnectionInfo>,
234 connections: RefCell<HashMap<String, redis::Connection>>,
235 slots: RefCell<HashMap<u16, String>>,
236 auto_reconnect: RefCell<bool>,
237 readonly: bool,
238 password: Option<String>,
239 read_timeout: Option<Duration>,
240 write_timeout: Option<Duration>,
241}
242
243impl Connection {
244 fn new(
245 initial_nodes: Vec<ConnectionInfo>,
246 readonly: bool,
247 password: Option<String>,
248 read_timeout: Option<Duration>,
249 write_timeout: Option<Duration>,
250 ) -> RedisResult<Connection> {
251 let connections = Self::create_initial_connections(
252 &initial_nodes,
253 readonly,
254 password.clone(),
255 read_timeout.clone(),
256 write_timeout.clone(),
257 )?;
258 let connection = Connection {
259 initial_nodes,
260 connections: RefCell::new(connections),
261 slots: RefCell::new(HashMap::with_capacity(SLOT_SIZE)),
262 auto_reconnect: RefCell::new(true),
263 readonly,
264 password,
265 read_timeout,
266 write_timeout,
267 };
268 connection.refresh_slots()?;
269
270 Ok(connection)
271 }
272
273 pub fn set_auto_reconnect(&self, value: bool) {
276 let mut auto_reconnect = self.auto_reconnect.borrow_mut();
277 *auto_reconnect = value;
278 }
279
280 #[deprecated(note = "Please use the Builder function instead")]
286 pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
287 let connections = self.connections.borrow();
288 for conn in connections.values() {
289 conn.set_write_timeout(dur)?;
290 }
291 Ok(())
292 }
293
294 #[deprecated(note = "Please use the Builder function instead")]
300 pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
301 let connections = self.connections.borrow();
302 for conn in connections.values() {
303 conn.set_read_timeout(dur)?;
304 }
305 Ok(())
306 }
307
308 pub fn check_connection(&mut self) -> bool {
310 let mut connections = self.connections.borrow_mut();
311 for conn in connections.values_mut() {
312 if !check_connection(conn) {
313 return false;
314 }
315 }
316 true
317 }
318
319 fn create_initial_connections(
320 initial_nodes: &[ConnectionInfo],
321 readonly: bool,
322 password: Option<String>,
323 read_timeout: Option<Duration>,
324 write_timeout: Option<Duration>,
325 ) -> RedisResult<HashMap<String, redis::Connection>> {
326 let mut connections = HashMap::with_capacity(initial_nodes.len());
327
328 for info in initial_nodes.iter() {
329 let addr = match *info.addr {
330 ConnectionAddr::Tcp(ref host, port) => format!("redis://{}:{}", host, port),
331 _ => panic!("No reach."),
332 };
333
334 if let Ok(mut conn) = connect(
335 info.clone(),
336 readonly,
337 password.clone(),
338 read_timeout.clone(),
339 write_timeout.clone(),
340 ) {
341 if check_connection(&mut conn) {
342 connections.insert(addr, conn);
343 break;
344 }
345 }
346 }
347
348 if connections.is_empty() {
349 return Err(RedisError::from((
350 ErrorKind::IoError,
351 "It is failed to check startup nodes.",
352 )));
353 }
354 Ok(connections)
355 }
356
357 fn refresh_slots(&self) -> RedisResult<()> {
359 let mut slots = self.slots.borrow_mut();
360 *slots = {
361 let new_slots = if self.readonly {
362 let mut rng = thread_rng();
363 self.create_new_slots(|slot_data| {
364 let replicas = slot_data.replicas();
365 if replicas.is_empty() {
366 slot_data.master().to_string()
367 } else {
368 replicas.choose(&mut rng).unwrap().to_string()
369 }
370 })
371 } else {
372 self.create_new_slots(|slot_data| slot_data.master().to_string())
373 };
374
375 if new_slots.len() != SLOT_SIZE {
376 return Err(RedisError::from((
377 ErrorKind::ResponseError,
378 "Slot refresh error.",
379 )));
380 }
381 new_slots
382 };
383
384 let mut connections = self.connections.borrow_mut();
385 *connections = {
386 let mut new_connections = HashMap::with_capacity(connections.len());
388
389 for addr in slots.values() {
390 if !new_connections.contains_key(addr) {
391 if connections.contains_key(addr) {
392 let mut conn = connections.remove(addr).unwrap();
393 if check_connection(&mut conn) {
394 new_connections.insert(addr.to_string(), conn);
395 continue;
396 }
397 }
398
399 if let Ok(mut conn) = connect(
400 addr.as_ref(),
401 self.readonly,
402 self.password.clone(),
403 self.read_timeout.clone(),
404 self.write_timeout.clone(),
405 ) {
406 if check_connection(&mut conn) {
407 new_connections.insert(addr.to_string(), conn);
408 }
409 }
410 }
411 }
412 new_connections
413 };
414
415 Ok(())
416 }
417
418 fn create_new_slots<F>(&self, mut get_addr: F) -> HashMap<u16, String>
419 where
420 F: FnMut(&Slot) -> String,
421 {
422 let mut connections = self.connections.borrow_mut();
423 let mut new_slots = HashMap::with_capacity(SLOT_SIZE);
424 let mut rng = thread_rng();
425 let len = connections.len();
426 let mut samples = connections.values_mut().choose_multiple(&mut rng, len);
427
428 for mut conn in samples.iter_mut() {
429 if let Ok(slots_data) = get_slots(&mut conn) {
430 for slot_data in slots_data {
431 for slot in slot_data.start()..=slot_data.end() {
432 new_slots.insert(slot, get_addr(&slot_data));
433 }
434 }
435 break;
436 }
437 }
438 new_slots
439 }
440
441 fn get_connection<'a>(
442 &self,
443 connections: &'a mut HashMap<String, redis::Connection>,
444 slot: u16,
445 ) -> (String, &'a mut redis::Connection) {
446 let slots = self.slots.borrow();
447
448 if let Some(addr) = slots.get(&slot) {
449 if connections.contains_key(addr) {
450 return (addr.clone(), connections.get_mut(addr).unwrap());
451 }
452
453 if let Ok(mut conn) = connect(
455 addr.as_ref(),
456 self.readonly,
457 self.password.clone(),
458 self.read_timeout.clone(),
459 self.write_timeout.clone(),
460 ) {
461 if check_connection(&mut conn) {
462 return (
463 addr.to_string(),
464 connections.entry(addr.to_string()).or_insert(conn),
465 );
466 }
467 }
468 }
469
470 get_random_connection(connections, None)
472 }
473
474 fn request<T, F>(&self, cmd: &[u8], mut func: F) -> RedisResult<T>
475 where
476 F: FnMut(&mut redis::Connection) -> RedisResult<T>,
477 {
478 let mut retries = 16;
479 let mut excludes = HashSet::new();
480 let slot = slot_for_packed_command(cmd);
481
482 loop {
483 let (addr, res) = {
485 let mut connections = self.connections.borrow_mut();
486 let (addr, mut conn) = if !excludes.is_empty() || slot.is_none() {
487 get_random_connection(&mut *connections, Some(&excludes))
488 } else {
489 self.get_connection(&mut *connections, slot.unwrap())
490 };
491 (addr, func(&mut conn))
492 };
493
494 match res {
495 Ok(res) => return Ok(res),
496 Err(err) => {
497 retries -= 1;
498 if retries == 0 {
499 return Err(err);
500 }
501
502 if err.kind() == ErrorKind::ExtensionError {
503 let error_code = err.extension_error_code().unwrap();
504
505 if error_code == "MOVED" || error_code == "ASK" {
506 self.refresh_slots()?;
508 excludes.clear();
509 continue;
510 } else if error_code == "TRYAGAIN" || error_code == "CLUSTERDOWN" {
511 let sleep_time = 2u64.pow(16 - retries.max(9)) * 10;
513 thread::sleep(Duration::from_millis(sleep_time));
514 excludes.clear();
515 continue;
516 }
517 } else if *self.auto_reconnect.borrow()
518 && err.kind() == ErrorKind::ResponseError
519 {
520 let new_connections = Self::create_initial_connections(
522 &self.initial_nodes,
523 self.readonly,
524 self.password.clone(),
525 self.read_timeout.clone(),
526 self.write_timeout.clone(),
527 )?;
528 {
529 let mut connections = self.connections.borrow_mut();
530 *connections = new_connections;
531 }
532 self.refresh_slots()?;
533 excludes.clear();
534 continue;
535 }
536
537 excludes.insert(addr);
538
539 let connections = self.connections.borrow();
540 if excludes.len() >= connections.len() {
541 return Err(err);
542 }
543 }
544 }
545 }
546 }
547}
548
549impl ConnectionLike for Connection {
550 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
551 self.request(cmd, move |conn| conn.req_packed_command(cmd))
552 }
553
554 fn req_packed_commands(
555 &mut self,
556 cmd: &[u8],
557 offset: usize,
558 count: usize,
559 ) -> RedisResult<Vec<Value>> {
560 self.request(cmd, move |conn| {
561 conn.req_packed_commands(cmd, offset, count)
562 })
563 }
564
565 fn get_db(&self) -> i64 {
566 0
567 }
568}
569
570impl Clone for Client {
571 fn clone(&self) -> Client {
572 Client::open(self.initial_nodes.clone()).unwrap()
573 }
574}
575
576fn connect<T: IntoConnectionInfo>(
577 info: T,
578 readonly: bool,
579 password: Option<String>,
580 read_timeout: Option<Duration>,
581 write_timeout: Option<Duration>,
582) -> RedisResult<redis::Connection>
583where
584 T: std::fmt::Debug,
585{
586 let mut connection_info = info.into_connection_info()?;
587 info!("Checking connection of {:?}", connection_info);
588 connection_info.passwd = password;
589 let client = redis::Client::open(connection_info)?;
590
591 let mut con = client.get_connection()?;
592 if readonly {
593 cmd("READONLY").query(&mut con)?;
594 }
595 con.set_read_timeout(read_timeout)?;
596 con.set_write_timeout(write_timeout)?;
597 Ok(con)
598}
599
600fn check_connection(conn: &mut redis::Connection) -> bool {
601 let mut cmd = Cmd::new();
602 cmd.arg("PING");
603 cmd.query::<String>(conn).is_ok()
604}
605
606fn get_random_connection<'a>(
607 connections: &'a mut HashMap<String, redis::Connection>,
608 excludes: Option<&'a HashSet<String>>,
609) -> (String, &'a mut redis::Connection) {
610 let mut rng = thread_rng();
611 let addr = match excludes {
612 Some(excludes) if excludes.len() < connections.len() => connections
613 .keys()
614 .filter(|key| !excludes.contains(*key))
615 .choose(&mut rng)
616 .unwrap()
617 .to_string(),
618 _ => connections.keys().choose(&mut rng).unwrap().to_string(),
619 };
620
621 let con = connections.get_mut(&addr).unwrap();
622 (addr, con)
623}
624
625fn slot_for_packed_command(cmd: &[u8]) -> Option<u16> {
626 let args = unpack_command(cmd);
627 if args.len() > 1 {
628 let key = match get_hashtag(&args[1]) {
629 Some(tag) => tag,
630 None => &args[1],
631 };
632 Some(State::<XMODEM>::calculate(key) % SLOT_SIZE as u16)
633 } else {
634 None
635 }
636}
637
638fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
639 let open = key.iter().position(|v| *v == b'{');
640 let open = match open {
641 Some(open) => open,
642 None => return None,
643 };
644
645 let close = key[open..].iter().position(|v| *v == b'}');
646 let close = match close {
647 Some(close) => close,
648 None => return None,
649 };
650
651 if close - open > 1 {
652 Some(&key[open + 1..close])
653 } else {
654 None
655 }
656}
657
658fn unpack_command(cmd: &[u8]) -> Vec<Vec<u8>> {
659 let mut args: Vec<Vec<u8>> = Vec::new();
660
661 let cursor = Cursor::new(cmd);
662 for line in cursor.lines() {
663 if let Ok(line) = line {
664 if !line.starts_with('*') && !line.starts_with('$') {
665 args.push(line.into_bytes());
666 }
667 }
668 }
669 args
670}
671
672#[derive(Debug)]
673struct Slot {
674 start: u16,
675 end: u16,
676 master: String,
677 replicas: Vec<String>,
678}
679
680impl Slot {
681 pub fn start(&self) -> u16 {
682 self.start
683 }
684 pub fn end(&self) -> u16 {
685 self.end
686 }
687 pub fn master(&self) -> &str {
688 &self.master
689 }
690 #[allow(dead_code)]
691 pub fn replicas(&self) -> &Vec<String> {
692 &self.replicas
693 }
694}
695
696fn get_slots(connection: &mut redis::Connection) -> RedisResult<Vec<Slot>> {
698 let mut cmd = Cmd::new();
699 cmd.arg("CLUSTER").arg("SLOTS");
700 let packed_command = cmd.get_packed_command();
701 let value = connection.req_packed_command(&packed_command)?;
702
703 let mut result = Vec::with_capacity(2);
705
706 if let Value::Bulk(items) = value {
707 let mut iter = items.into_iter();
708 while let Some(Value::Bulk(item)) = iter.next() {
709 if item.len() < 3 {
710 continue;
711 }
712
713 let start = if let Value::Int(start) = item[0] {
714 start as u16
715 } else {
716 continue;
717 };
718
719 let end = if let Value::Int(end) = item[1] {
720 end as u16
721 } else {
722 continue;
723 };
724
725 let mut nodes: Vec<String> = item
726 .into_iter()
727 .skip(2)
728 .filter_map(|node| {
729 if let Value::Bulk(node) = node {
730 if node.len() < 2 {
731 return None;
732 }
733
734 let ip = if let Value::Data(ref ip) = node[0] {
735 String::from_utf8_lossy(ip)
736 } else {
737 return None;
738 };
739 if ip.is_empty() {
740 return None;
741 }
742
743 let port = if let Value::Int(port) = node[1] {
744 port
745 } else {
746 return None;
747 };
748 info!("Parsed host:port slot {}:{}", ip, port);
749 Some(format!("redis://{}:{}", ip, port))
750 } else {
751 None
752 }
753 })
754 .collect();
755
756 if nodes.is_empty() {
757 continue;
758 }
759
760 let replicas = nodes.split_off(1);
761 result.push(Slot {
762 start,
763 end,
764 master: nodes.pop().unwrap(),
765 replicas,
766 });
767 info!("Added slot {:?}", result.last());
768 }
769 }
770
771 info!("Slots: {:?}", result);
772
773 Ok(result)
774}
775
776#[cfg(test)]
777mod tests {
778 use super::{Builder, Client};
779 use redis::{ConnectionInfo, IntoConnectionInfo};
780
781 fn get_connection_data() -> Vec<ConnectionInfo> {
782 vec![
783 "redis://127.0.0.1:6379".into_connection_info().unwrap(),
784 "redis://127.0.0.1:6378".into_connection_info().unwrap(),
785 "redis://127.0.0.1:6377".into_connection_info().unwrap(),
786 ]
787 }
788
789 fn get_connection_data_with_password() -> Vec<ConnectionInfo> {
790 vec![
791 "redis://:password@127.0.0.1:6379"
792 .into_connection_info()
793 .unwrap(),
794 "redis://:password@127.0.0.1:6378"
795 .into_connection_info()
796 .unwrap(),
797 "redis://:password@127.0.0.1:6377"
798 .into_connection_info()
799 .unwrap(),
800 ]
801 }
802
803 #[test]
804 fn give_no_password() {
805 let client = Client::open(get_connection_data()).unwrap();
806 assert_eq!(client.password, None);
807 }
808
809 #[test]
810 fn give_password_by_initial_nodes() {
811 let client = Client::open(get_connection_data_with_password()).unwrap();
812 assert_eq!(client.password, Some("password".to_string()));
813 }
814
815 #[test]
816 fn give_different_password_by_initial_nodes() {
817 let result = Client::open(vec![
818 "redis://:password1@127.0.0.1:6379",
819 "redis://:password2@127.0.0.1:6378",
820 "redis://:password3@127.0.0.1:6377",
821 ]);
822 assert!(result.is_err());
823 }
824
825 #[test]
826 fn give_password_by_method() {
827 let client = Builder::new(get_connection_data_with_password())
828 .password("pass".to_string())
829 .open()
830 .unwrap();
831 assert_eq!(client.password, Some("pass".to_string()));
832 }
833}