Skip to main content

redis_cluster_rs/
lib.rs

1//! This is a rust implementation for Redis cluster library.
2//!
3//! This library extends redis-rs library to be able to use cluster.
4//! Client impletemts traits of ConnectionLike and Commands.
5//! So you can use redis-rs's access methods.
6//! If you want more information, read document of redis-rs.
7//!
8//! Note that this library is currently not have features of Pubsub.
9//!
10//! # Example
11//! ```rust,no_run
12//! extern crate redis_cluster_rs;
13//!
14//! use redis_cluster_rs::{Client, Commands};
15//!
16//! fn main() {
17//!     let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
18//!     let client = Client::open(nodes).unwrap();
19//!     let mut connection = client.get_connection().unwrap();
20//!
21//!     let _: () = connection.set("test", "test_data").unwrap();
22//!     let res: String = connection.get("test").unwrap();
23//!
24//!     assert_eq!(res, "test_data");
25//! }
26//! ```
27//!
28//! # Pipelining
29//! ```rust,no_run
30//! extern crate redis_cluster_rs;
31//!
32//! use redis_cluster_rs::{Client, PipelineCommands, pipe};
33//!
34//! fn main() {
35//!     let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
36//!     let client = Client::open(nodes).unwrap();
37//!     let mut connection = client.get_connection().unwrap();
38//!
39//!     let key = "test";
40//!
41//!     let _: () = pipe()
42//!         .rpush(key, "123").ignore()
43//!         .ltrim(key, -10, -1).ignore()
44//!         .expire(key, 60).ignore()
45//!         .query(&mut connection).unwrap();
46//! }
47//! ```
48#[macro_use]
49extern crate log;
50extern crate crc16;
51extern crate rand;
52
53pub extern crate redis;
54
55use std::cell::RefCell;
56use std::collections::{HashMap, HashSet};
57use std::io::{BufRead, Cursor};
58use std::iter::Iterator;
59use std::thread;
60use std::time::Duration;
61
62use crc16::*;
63use rand::{
64    seq::{IteratorRandom, SliceRandom},
65    thread_rng,
66};
67use redis::{
68    cmd, Cmd, ConnectionAddr, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, Value,
69};
70
71pub use redis::{pipe, Commands, ConnectionLike, PipelineCommands, RedisResult};
72
73const SLOT_SIZE: usize = 16384;
74
75/// This is a Builder of Redis cluster client.
76pub struct Builder<T: IntoConnectionInfo> {
77    initial_nodes: Vec<T>,
78    readonly: bool,
79    password: Option<String>,
80    read_timeout: Option<Duration>,
81    write_timeout: Option<Duration>,
82}
83
84impl<T: IntoConnectionInfo> Builder<T> {
85    /// Generate the base configuration for new Client.
86    pub fn new(initial_nodes: Vec<T>) -> Builder<T> {
87        Builder {
88            initial_nodes: initial_nodes,
89            readonly: false,
90            password: None,
91            read_timeout: None,
92            write_timeout: None,
93        }
94    }
95
96    /// Connect to a redis cluster server and return a cluster client.
97    /// This does not actually open a connection yet but it performs some basic checks on the URL.
98    /// The password of initial nodes must be the same all.
99    ///
100    /// # Errors
101    ///
102    /// If it is failed to parse initial_nodes or the initial nodes has different password, an error is returned.
103    pub fn open(self) -> RedisResult<Client> {
104        Client::open_internal(self)
105    }
106
107    /// Set password for new Client.
108    pub fn password(mut self, password: String) -> Builder<T> {
109        self.password = Some(password);
110        return self;
111    }
112
113    /// Set read only mode for new Client.
114    /// Default is not read only mode.
115    /// When it is set to readonly mode, all query use replica nodes except there are no replica nodes.
116    /// If there are no replica nodes, it use master node.
117    pub fn readonly(mut self, readonly: bool) -> Builder<T> {
118        self.readonly = readonly;
119        return self;
120    }
121
122    /// Set the read timeout for the connection.
123    ///
124    /// If the provided value is `None`, then `recv_response` call will
125    /// block indefinitely. It is an error to pass the zero `Duration` to this
126    /// method.
127    pub fn read_timeout(mut self, timeout: Option<Duration>) -> Builder<T> {
128        self.read_timeout = timeout;
129        return self;
130    }
131
132    /// Set the write timeout for the connection.
133    ///
134    /// If the provided value is `None`, then `send_packed_command` call will
135    /// block indefinitely. It is an error to pass the zero `Duration` to this
136    /// method.
137    pub fn write_timeout(mut self, timeout: Option<Duration>) -> Builder<T> {
138        self.write_timeout = timeout;
139        return self;
140    }
141}
142
143/// This is a Redis cluster client.
144pub struct Client {
145    initial_nodes: Vec<ConnectionInfo>,
146    readonly: bool,
147    password: Option<String>,
148    read_timeout: Option<Duration>,
149    write_timeout: Option<Duration>,
150}
151
152impl Client {
153    /// Connect to a redis cluster server and return a cluster client.
154    /// This does not actually open a connection yet but it performs some basic checks on the URL.
155    /// The password of initial nodes must be the same all.
156    ///
157    /// # Errors
158    ///
159    /// If it is failed to parse initial_nodes or the initial nodes has different password, an error is returned.
160    pub fn open<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<Client> {
161        Builder::new(initial_nodes).open()
162    }
163
164    /// Connect to a redis cluster server with authentication and return a cluster client.
165    /// This does not actually open a connection yet but it performs some basic checks on the URL.
166    ///
167    /// Redis cluster's password must be the same all.
168    /// This function is not use the password of initial nodes.
169    ///
170    /// # Errors
171    ///
172    /// If it is failed to parse initial_nodes, an error is returned.
173    #[deprecated(note = "Please use Builder instead")]
174    pub fn open_with_auth<T: IntoConnectionInfo>(
175        initial_nodes: Vec<T>,
176        password: String,
177    ) -> RedisResult<Client> {
178        Builder::new(initial_nodes).password(password).open()
179    }
180
181    /// Open and get a Redis cluster connection.
182    ///
183    /// # Errors
184    ///
185    /// If it is failed to open connections and to create slots, an error is returned.
186    pub fn get_connection(&self) -> RedisResult<Connection> {
187        Connection::new(
188            self.initial_nodes.clone(),
189            self.readonly,
190            self.password.clone(),
191            self.read_timeout.clone(),
192            self.write_timeout.clone(),
193        )
194    }
195
196    fn open_internal<T: IntoConnectionInfo>(builder: Builder<T>) -> RedisResult<Client> {
197        let mut nodes = Vec::with_capacity(builder.initial_nodes.len());
198        let mut connection_info_password = None::<String>;
199
200        for (index, info) in builder.initial_nodes.into_iter().enumerate() {
201            let info = info.into_connection_info()?;
202            if let ConnectionAddr::Unix(_) = *info.addr {
203                return Err(RedisError::from((ErrorKind::InvalidClientConfig,
204                                             "This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
205            }
206
207            if builder.password.is_none() {
208                if index == 0 {
209                    connection_info_password = info.passwd.clone();
210                } else if connection_info_password != info.passwd {
211                    return Err(RedisError::from((
212                        ErrorKind::InvalidClientConfig,
213                        "Cannot use different password among initial nodes.",
214                    )));
215                }
216            }
217
218            nodes.push(info);
219        }
220
221        Ok(Client {
222            initial_nodes: nodes,
223            readonly: builder.readonly,
224            password: builder.password.or(connection_info_password),
225            read_timeout: builder.read_timeout,
226            write_timeout: builder.write_timeout,
227        })
228    }
229}
230
231/// This is a connection of Redis cluster.
232pub struct Connection {
233    initial_nodes: Vec<ConnectionInfo>,
234    connections: RefCell<HashMap<String, redis::Connection>>,
235    slots: RefCell<HashMap<u16, String>>,
236    auto_reconnect: RefCell<bool>,
237    readonly: bool,
238    password: Option<String>,
239    read_timeout: Option<Duration>,
240    write_timeout: Option<Duration>,
241}
242
243impl Connection {
244    fn new(
245        initial_nodes: Vec<ConnectionInfo>,
246        readonly: bool,
247        password: Option<String>,
248        read_timeout: Option<Duration>,
249        write_timeout: Option<Duration>,
250    ) -> RedisResult<Connection> {
251        let connections = Self::create_initial_connections(
252            &initial_nodes,
253            readonly,
254            password.clone(),
255            read_timeout.clone(),
256            write_timeout.clone(),
257        )?;
258        let connection = Connection {
259            initial_nodes,
260            connections: RefCell::new(connections),
261            slots: RefCell::new(HashMap::with_capacity(SLOT_SIZE)),
262            auto_reconnect: RefCell::new(true),
263            readonly,
264            password,
265            read_timeout,
266            write_timeout,
267        };
268        connection.refresh_slots()?;
269
270        Ok(connection)
271    }
272
273    /// Set an auto reconnect attribute.
274    /// Default value is true;
275    pub fn set_auto_reconnect(&self, value: bool) {
276        let mut auto_reconnect = self.auto_reconnect.borrow_mut();
277        *auto_reconnect = value;
278    }
279
280    /// Set the write timeout for the connection.
281    ///
282    /// If the provided value is `None`, then `send_packed_command` call will
283    /// block indefinitely. It is an error to pass the zero `Duration` to this
284    /// method.
285    #[deprecated(note = "Please use the Builder function instead")]
286    pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
287        let connections = self.connections.borrow();
288        for conn in connections.values() {
289            conn.set_write_timeout(dur)?;
290        }
291        Ok(())
292    }
293
294    /// Set the read timeout for the connection.
295    ///
296    /// If the provided value is `None`, then `recv_response` call will
297    /// block indefinitely. It is an error to pass the zero `Duration` to this
298    /// method.
299    #[deprecated(note = "Please use the Builder function instead")]
300    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
301        let connections = self.connections.borrow();
302        for conn in connections.values() {
303            conn.set_read_timeout(dur)?;
304        }
305        Ok(())
306    }
307
308    /// Check that all connections it has are available.
309    pub fn check_connection(&mut self) -> bool {
310        let mut connections = self.connections.borrow_mut();
311        for conn in connections.values_mut() {
312            if !check_connection(conn) {
313                return false;
314            }
315        }
316        true
317    }
318
319    fn create_initial_connections(
320        initial_nodes: &[ConnectionInfo],
321        readonly: bool,
322        password: Option<String>,
323        read_timeout: Option<Duration>,
324        write_timeout: Option<Duration>,
325    ) -> RedisResult<HashMap<String, redis::Connection>> {
326        let mut connections = HashMap::with_capacity(initial_nodes.len());
327
328        for info in initial_nodes.iter() {
329            let addr = match *info.addr {
330                ConnectionAddr::Tcp(ref host, port) => format!("redis://{}:{}", host, port),
331                _ => panic!("No reach."),
332            };
333
334            if let Ok(mut conn) = connect(
335                info.clone(),
336                readonly,
337                password.clone(),
338                read_timeout.clone(),
339                write_timeout.clone(),
340            ) {
341                if check_connection(&mut conn) {
342                    connections.insert(addr, conn);
343                    break;
344                }
345            }
346        }
347
348        if connections.is_empty() {
349            return Err(RedisError::from((
350                ErrorKind::IoError,
351                "It is failed to check startup nodes.",
352            )));
353        }
354        Ok(connections)
355    }
356
357    // Query a node to discover slot-> master mappings.
358    fn refresh_slots(&self) -> RedisResult<()> {
359        let mut slots = self.slots.borrow_mut();
360        *slots = {
361            let new_slots = if self.readonly {
362                let mut rng = thread_rng();
363                self.create_new_slots(|slot_data| {
364                    let replicas = slot_data.replicas();
365                    if replicas.is_empty() {
366                        slot_data.master().to_string()
367                    } else {
368                        replicas.choose(&mut rng).unwrap().to_string()
369                    }
370                })
371            } else {
372                self.create_new_slots(|slot_data| slot_data.master().to_string())
373            };
374
375            if new_slots.len() != SLOT_SIZE {
376                return Err(RedisError::from((
377                    ErrorKind::ResponseError,
378                    "Slot refresh error.",
379                )));
380            }
381            new_slots
382        };
383
384        let mut connections = self.connections.borrow_mut();
385        *connections = {
386            // Remove dead connections and connect to new nodes if necessary
387            let mut new_connections = HashMap::with_capacity(connections.len());
388
389            for addr in slots.values() {
390                if !new_connections.contains_key(addr) {
391                    if connections.contains_key(addr) {
392                        let mut conn = connections.remove(addr).unwrap();
393                        if check_connection(&mut conn) {
394                            new_connections.insert(addr.to_string(), conn);
395                            continue;
396                        }
397                    }
398
399                    if let Ok(mut conn) = connect(
400                        addr.as_ref(),
401                        self.readonly,
402                        self.password.clone(),
403                        self.read_timeout.clone(),
404                        self.write_timeout.clone(),
405                    ) {
406                        if check_connection(&mut conn) {
407                            new_connections.insert(addr.to_string(), conn);
408                        }
409                    }
410                }
411            }
412            new_connections
413        };
414
415        Ok(())
416    }
417
418    fn create_new_slots<F>(&self, mut get_addr: F) -> HashMap<u16, String>
419    where
420        F: FnMut(&Slot) -> String,
421    {
422        let mut connections = self.connections.borrow_mut();
423        let mut new_slots = HashMap::with_capacity(SLOT_SIZE);
424        let mut rng = thread_rng();
425        let len = connections.len();
426        let mut samples = connections.values_mut().choose_multiple(&mut rng, len);
427
428        for mut conn in samples.iter_mut() {
429            if let Ok(slots_data) = get_slots(&mut conn) {
430                for slot_data in slots_data {
431                    for slot in slot_data.start()..=slot_data.end() {
432                        new_slots.insert(slot, get_addr(&slot_data));
433                    }
434                }
435                break;
436            }
437        }
438        new_slots
439    }
440
441    fn get_connection<'a>(
442        &self,
443        connections: &'a mut HashMap<String, redis::Connection>,
444        slot: u16,
445    ) -> (String, &'a mut redis::Connection) {
446        let slots = self.slots.borrow();
447
448        if let Some(addr) = slots.get(&slot) {
449            if connections.contains_key(addr) {
450                return (addr.clone(), connections.get_mut(addr).unwrap());
451            }
452
453            // Create new connection.
454            if let Ok(mut conn) = connect(
455                addr.as_ref(),
456                self.readonly,
457                self.password.clone(),
458                self.read_timeout.clone(),
459                self.write_timeout.clone(),
460            ) {
461                if check_connection(&mut conn) {
462                    return (
463                        addr.to_string(),
464                        connections.entry(addr.to_string()).or_insert(conn),
465                    );
466                }
467            }
468        }
469
470        // Return a random connection
471        get_random_connection(connections, None)
472    }
473
474    fn request<T, F>(&self, cmd: &[u8], mut func: F) -> RedisResult<T>
475    where
476        F: FnMut(&mut redis::Connection) -> RedisResult<T>,
477    {
478        let mut retries = 16;
479        let mut excludes = HashSet::new();
480        let slot = slot_for_packed_command(cmd);
481
482        loop {
483            // Get target address and response.
484            let (addr, res) = {
485                let mut connections = self.connections.borrow_mut();
486                let (addr, mut conn) = if !excludes.is_empty() || slot.is_none() {
487                    get_random_connection(&mut *connections, Some(&excludes))
488                } else {
489                    self.get_connection(&mut *connections, slot.unwrap())
490                };
491                (addr, func(&mut conn))
492            };
493
494            match res {
495                Ok(res) => return Ok(res),
496                Err(err) => {
497                    retries -= 1;
498                    if retries == 0 {
499                        return Err(err);
500                    }
501
502                    if err.kind() == ErrorKind::ExtensionError {
503                        let error_code = err.extension_error_code().unwrap();
504
505                        if error_code == "MOVED" || error_code == "ASK" {
506                            // Refresh slots and request again.
507                            self.refresh_slots()?;
508                            excludes.clear();
509                            continue;
510                        } else if error_code == "TRYAGAIN" || error_code == "CLUSTERDOWN" {
511                            // Sleep and retry.
512                            let sleep_time = 2u64.pow(16 - retries.max(9)) * 10;
513                            thread::sleep(Duration::from_millis(sleep_time));
514                            excludes.clear();
515                            continue;
516                        }
517                    } else if *self.auto_reconnect.borrow()
518                        && err.kind() == ErrorKind::ResponseError
519                    {
520                        // Reconnect when ResponseError is occurred.
521                        let new_connections = Self::create_initial_connections(
522                            &self.initial_nodes,
523                            self.readonly,
524                            self.password.clone(),
525                            self.read_timeout.clone(),
526                            self.write_timeout.clone(),
527                        )?;
528                        {
529                            let mut connections = self.connections.borrow_mut();
530                            *connections = new_connections;
531                        }
532                        self.refresh_slots()?;
533                        excludes.clear();
534                        continue;
535                    }
536
537                    excludes.insert(addr);
538
539                    let connections = self.connections.borrow();
540                    if excludes.len() >= connections.len() {
541                        return Err(err);
542                    }
543                }
544            }
545        }
546    }
547}
548
549impl ConnectionLike for Connection {
550    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
551        self.request(cmd, move |conn| conn.req_packed_command(cmd))
552    }
553
554    fn req_packed_commands(
555        &mut self,
556        cmd: &[u8],
557        offset: usize,
558        count: usize,
559    ) -> RedisResult<Vec<Value>> {
560        self.request(cmd, move |conn| {
561            conn.req_packed_commands(cmd, offset, count)
562        })
563    }
564
565    fn get_db(&self) -> i64 {
566        0
567    }
568}
569
570impl Clone for Client {
571    fn clone(&self) -> Client {
572        Client::open(self.initial_nodes.clone()).unwrap()
573    }
574}
575
576fn connect<T: IntoConnectionInfo>(
577    info: T,
578    readonly: bool,
579    password: Option<String>,
580    read_timeout: Option<Duration>,
581    write_timeout: Option<Duration>,
582) -> RedisResult<redis::Connection>
583where
584    T: std::fmt::Debug,
585{
586    let mut connection_info = info.into_connection_info()?;
587    info!("Checking connection of {:?}", connection_info);
588    connection_info.passwd = password;
589    let client = redis::Client::open(connection_info)?;
590
591    let mut con = client.get_connection()?;
592    if readonly {
593        cmd("READONLY").query(&mut con)?;
594    }
595    con.set_read_timeout(read_timeout)?;
596    con.set_write_timeout(write_timeout)?;
597    Ok(con)
598}
599
600fn check_connection(conn: &mut redis::Connection) -> bool {
601    let mut cmd = Cmd::new();
602    cmd.arg("PING");
603    cmd.query::<String>(conn).is_ok()
604}
605
606fn get_random_connection<'a>(
607    connections: &'a mut HashMap<String, redis::Connection>,
608    excludes: Option<&'a HashSet<String>>,
609) -> (String, &'a mut redis::Connection) {
610    let mut rng = thread_rng();
611    let addr = match excludes {
612        Some(excludes) if excludes.len() < connections.len() => connections
613            .keys()
614            .filter(|key| !excludes.contains(*key))
615            .choose(&mut rng)
616            .unwrap()
617            .to_string(),
618        _ => connections.keys().choose(&mut rng).unwrap().to_string(),
619    };
620
621    let con = connections.get_mut(&addr).unwrap();
622    (addr, con)
623}
624
625fn slot_for_packed_command(cmd: &[u8]) -> Option<u16> {
626    let args = unpack_command(cmd);
627    if args.len() > 1 {
628        let key = match get_hashtag(&args[1]) {
629            Some(tag) => tag,
630            None => &args[1],
631        };
632        Some(State::<XMODEM>::calculate(key) % SLOT_SIZE as u16)
633    } else {
634        None
635    }
636}
637
638fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
639    let open = key.iter().position(|v| *v == b'{');
640    let open = match open {
641        Some(open) => open,
642        None => return None,
643    };
644
645    let close = key[open..].iter().position(|v| *v == b'}');
646    let close = match close {
647        Some(close) => close,
648        None => return None,
649    };
650
651    if close - open > 1 {
652        Some(&key[open + 1..close])
653    } else {
654        None
655    }
656}
657
658fn unpack_command(cmd: &[u8]) -> Vec<Vec<u8>> {
659    let mut args: Vec<Vec<u8>> = Vec::new();
660
661    let cursor = Cursor::new(cmd);
662    for line in cursor.lines() {
663        if let Ok(line) = line {
664            if !line.starts_with('*') && !line.starts_with('$') {
665                args.push(line.into_bytes());
666            }
667        }
668    }
669    args
670}
671
672#[derive(Debug)]
673struct Slot {
674    start: u16,
675    end: u16,
676    master: String,
677    replicas: Vec<String>,
678}
679
680impl Slot {
681    pub fn start(&self) -> u16 {
682        self.start
683    }
684    pub fn end(&self) -> u16 {
685        self.end
686    }
687    pub fn master(&self) -> &str {
688        &self.master
689    }
690    #[allow(dead_code)]
691    pub fn replicas(&self) -> &Vec<String> {
692        &self.replicas
693    }
694}
695
696// Get slot data from connection.
697fn get_slots(connection: &mut redis::Connection) -> RedisResult<Vec<Slot>> {
698    let mut cmd = Cmd::new();
699    cmd.arg("CLUSTER").arg("SLOTS");
700    let packed_command = cmd.get_packed_command();
701    let value = connection.req_packed_command(&packed_command)?;
702
703    // Parse response.
704    let mut result = Vec::with_capacity(2);
705
706    if let Value::Bulk(items) = value {
707        let mut iter = items.into_iter();
708        while let Some(Value::Bulk(item)) = iter.next() {
709            if item.len() < 3 {
710                continue;
711            }
712
713            let start = if let Value::Int(start) = item[0] {
714                start as u16
715            } else {
716                continue;
717            };
718
719            let end = if let Value::Int(end) = item[1] {
720                end as u16
721            } else {
722                continue;
723            };
724
725            let mut nodes: Vec<String> = item
726                .into_iter()
727                .skip(2)
728                .filter_map(|node| {
729                    if let Value::Bulk(node) = node {
730                        if node.len() < 2 {
731                            return None;
732                        }
733
734                        let ip = if let Value::Data(ref ip) = node[0] {
735                            String::from_utf8_lossy(ip)
736                        } else {
737                            return None;
738                        };
739                        if ip.is_empty() {
740                            return None;
741                        }
742
743                        let port = if let Value::Int(port) = node[1] {
744                            port
745                        } else {
746                            return None;
747                        };
748                        info!("Parsed host:port slot {}:{}", ip, port);
749                        Some(format!("redis://{}:{}", ip, port))
750                    } else {
751                        None
752                    }
753                })
754                .collect();
755
756            if nodes.is_empty() {
757                continue;
758            }
759
760            let replicas = nodes.split_off(1);
761            result.push(Slot {
762                start,
763                end,
764                master: nodes.pop().unwrap(),
765                replicas,
766            });
767            info!("Added slot {:?}", result.last());
768        }
769    }
770
771    info!("Slots: {:?}", result);
772
773    Ok(result)
774}
775
776#[cfg(test)]
777mod tests {
778    use super::{Builder, Client};
779    use redis::{ConnectionInfo, IntoConnectionInfo};
780
781    fn get_connection_data() -> Vec<ConnectionInfo> {
782        vec![
783            "redis://127.0.0.1:6379".into_connection_info().unwrap(),
784            "redis://127.0.0.1:6378".into_connection_info().unwrap(),
785            "redis://127.0.0.1:6377".into_connection_info().unwrap(),
786        ]
787    }
788
789    fn get_connection_data_with_password() -> Vec<ConnectionInfo> {
790        vec![
791            "redis://:password@127.0.0.1:6379"
792                .into_connection_info()
793                .unwrap(),
794            "redis://:password@127.0.0.1:6378"
795                .into_connection_info()
796                .unwrap(),
797            "redis://:password@127.0.0.1:6377"
798                .into_connection_info()
799                .unwrap(),
800        ]
801    }
802
803    #[test]
804    fn give_no_password() {
805        let client = Client::open(get_connection_data()).unwrap();
806        assert_eq!(client.password, None);
807    }
808
809    #[test]
810    fn give_password_by_initial_nodes() {
811        let client = Client::open(get_connection_data_with_password()).unwrap();
812        assert_eq!(client.password, Some("password".to_string()));
813    }
814
815    #[test]
816    fn give_different_password_by_initial_nodes() {
817        let result = Client::open(vec![
818            "redis://:password1@127.0.0.1:6379",
819            "redis://:password2@127.0.0.1:6378",
820            "redis://:password3@127.0.0.1:6377",
821        ]);
822        assert!(result.is_err());
823    }
824
825    #[test]
826    fn give_password_by_method() {
827        let client = Builder::new(get_connection_data_with_password())
828            .password("pass".to_string())
829            .open()
830            .unwrap();
831        assert_eq!(client.password, Some("pass".to_string()));
832    }
833}