simple_redis 0.6.5

Simple and resilient redis client.
Documentation
use simple_redis;
use simple_redis::{Interrupts, Message};
use std::{thread, time};

#[test]
fn pub_sub() {
    match simple_redis::create("redis://127.0.0.1:6379/") {
        Ok(mut subscriber) => {
            assert!(!subscriber.is_connection_open());

            let mut result = subscriber.subscribe("int_pub_sub");
            assert!(result.is_ok());

            thread::spawn(|| {
                thread::sleep(time::Duration::from_secs(2));

                match simple_redis::create("redis://127.0.0.1:6379/") {
                    Ok(mut publisher) => {
                        assert!(!publisher.is_connection_open());

                        let result = publisher.publish("int_pub_sub", "test pub_sub message");
                        assert!(result.is_ok());

                        assert!(publisher.is_connection_open());
                    }
                    _ => panic!("test error"),
                };
            });

            subscriber
                .fetch_messages(
                    &mut |message: Message| -> bool {
                        let payload: String = message.get_payload().unwrap();
                        assert_eq!(payload, "test pub_sub message");
                        true
                    },
                    &mut || -> Interrupts { Interrupts::new() },
                )
                .unwrap();

            result = subscriber.subscribe("int_pub_sub2");
            assert!(result.is_ok());

            result = subscriber.unsubscribe("int_pub_sub");
            assert!(result.is_ok());

            thread::spawn(|| {
                thread::sleep(time::Duration::from_secs(2));

                match simple_redis::create("redis://127.0.0.1:6379/") {
                    Ok(mut publisher) => {
                        assert!(!publisher.is_connection_open());

                        let mut result = publisher.publish("int_pub_sub", "bad");
                        assert!(result.is_ok());

                        assert!(publisher.is_connection_open());

                        thread::sleep(time::Duration::from_secs(1));

                        result = publisher.publish("int_pub_sub2", "good");
                        assert!(result.is_ok());
                    }
                    _ => panic!("test error"),
                };
            });

            subscriber
                .fetch_messages(
                    &mut |message: Message| -> bool {
                        let payload: String = message.get_payload().unwrap();
                        assert_eq!(payload, "good");
                        true
                    },
                    &mut || -> Interrupts { Interrupts::new() },
                )
                .unwrap();

            thread::spawn(|| {
                thread::sleep(time::Duration::from_secs(2));

                match simple_redis::create("redis://127.0.0.1:6379/") {
                    Ok(mut publisher) => {
                        assert!(!publisher.is_connection_open());

                        let mut result = publisher.publish("int_pub_sub", "bad");
                        assert!(result.is_ok());

                        assert!(publisher.is_connection_open());

                        thread::sleep(time::Duration::from_secs(1));

                        result = publisher.publish("int_pub_sub2", "good");
                        assert!(result.is_ok());
                    }
                    _ => panic!("test error"),
                };
            });

            let mut counter = 0;
            subscriber
                .fetch_messages(
                    &mut |_message: Message| -> bool {
                        panic!("test error");
                    },
                    &mut || -> Interrupts {
                        counter = counter + 1;

                        let mut interrupts = Interrupts::new();
                        interrupts.stop = counter == 5;
                        interrupts.next_polling_time = Some(10);

                        interrupts
                    },
                )
                .unwrap();
        }
        _ => panic!("test error"),
    };
}

#[test]
fn pub_psub() {
    match simple_redis::create("redis://127.0.0.1:6379/") {
        Ok(mut subscriber) => {
            assert!(!subscriber.is_connection_open());

            let mut result = subscriber.psubscribe("int_pub_psub::*");
            assert!(result.is_ok());

            thread::spawn(|| {
                thread::sleep(time::Duration::from_secs(2));

                match simple_redis::create("redis://127.0.0.1:6379/") {
                    Ok(mut publisher) => {
                        assert!(!publisher.is_connection_open());

                        let result = publisher.publish("int_pub_psub::123", "test pub_sub message");
                        assert!(result.is_ok());

                        assert!(publisher.is_connection_open());
                    }
                    _ => panic!("test error"),
                };
            });

            subscriber
                .fetch_messages(
                    &mut |message: Message| -> bool {
                        let payload: String = message.get_payload().unwrap();
                        assert_eq!(payload, "test pub_sub message");
                        true
                    },
                    &mut || -> Interrupts { Interrupts::new() },
                )
                .unwrap();

            result = subscriber.psubscribe("int_pub_psub2::*");
            assert!(result.is_ok());

            result = subscriber.punsubscribe("int_pub_psub::*");
            assert!(result.is_ok());

            thread::spawn(|| {
                thread::sleep(time::Duration::from_secs(2));

                match simple_redis::create("redis://127.0.0.1:6379/") {
                    Ok(mut publisher) => {
                        assert!(!publisher.is_connection_open());

                        let mut result = publisher.publish("int_pub_psub::123", "bad");
                        assert!(result.is_ok());

                        assert!(publisher.is_connection_open());

                        thread::sleep(time::Duration::from_secs(1));

                        result = publisher.publish("int_pub_psub2::123", "good");
                        assert!(result.is_ok());
                    }
                    _ => panic!("test error"),
                };
            });

            subscriber
                .fetch_messages(
                    &mut |message: Message| -> bool {
                        let payload: String = message.get_payload().unwrap();
                        assert_eq!(payload, "good");
                        true
                    },
                    &mut || -> Interrupts { Interrupts::new() },
                )
                .unwrap();
        }
        _ => panic!("test error"),
    };
}