stream_consumer/
stream_consumer.rs

1extern crate tokio;
2extern crate futures;
3
4use std::env;
5use std::io;
6use std::net::SocketAddr;
7use std::collections::HashMap;
8use futures::{Future, Stream, Sink};
9use futures::sync::mpsc::{UnboundedSender, unbounded};
10
11use redis_asio::{RedisResult, RedisValue, RedisError, RedisErrorKind, FromRedisValue, from_redis_value};
12use redis_asio::stream::{RedisStream, StreamEntry, EntryId, AckResponse, SubscribeOptions,
13                         RedisGroup, TouchGroupOptions, AckOptions};
14
15#[derive(Debug)]
16struct Message(String);
17
18/// Implements the trait to allow implicit conversion from RedisValue to Message
19/// via from_redis_value()
20impl FromRedisValue for Message {
21    fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
22        match value {
23            RedisValue::BulkString(data) => {
24                let string = String::from_utf8(data.clone())
25                    .map_err(|err|
26                        RedisError::new(RedisErrorKind::ParseError,
27                                        format!("Could not parse message: {}", err))
28                    )?;
29                // Construct a Message from received data
30                Ok(Message(string))
31            }
32            _ => Err(RedisError::new(RedisErrorKind::ParseError,
33                                     format!("Could not parse message from invalid RedisValue {:?}", value)))
34        }
35    }
36}
37
38impl Message {
39    /// Tries to convert a Message from HashMap<String, RedisValue> (represents a Redis Stream entry).
40    /// The entry should have the following structure:
41    /// "type Message data \"Some data\""
42    fn from_redis_stream_entry(key_values: &HashMap<String, RedisValue>) -> RedisResult<Self> {
43        if key_values.len() != 2 {
44            return Err(RedisError::new(RedisErrorKind::ParseError,
45                                       "Invalid packet".to_string()));
46        }
47
48        let get_value =
49            |key: &str| match key_values.get(key) {
50                Some(x) => Ok(x),
51                _ => Err(RedisError::new(RedisErrorKind::ParseError,
52                                         "Invalid packet".to_string()))
53            };
54
55        let packet_type: String = from_redis_value(get_value("type")?)?;
56        match packet_type.as_str() {
57            "Message" => {
58                let data: Message = from_redis_value(get_value("data")?)?;
59                Ok(data)
60            }
61            _ => Err(RedisError::new(RedisErrorKind::ParseError,
62                                     "Unknown message type".to_string()))
63        }
64    }
65}
66
67fn main() {
68    println!("Consumer example has started");
69    println!("Please enter a STREAM to listen on");
70    let stream_name = read_stdin();
71    println!("Please enter a GROUP");
72    let group_name = read_stdin();
73    println!("Please enter a CONSUMER");
74    let consumer_name = read_stdin();
75
76    let redis_address = env::var("REDIS_URL")
77        .unwrap_or("127.0.0.1:6379".to_string())
78        .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80    let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82    // Try to create a group.
83    // If the group exists already, the future will not be set into an error.
84    // The create_group variable is the Future<Item=(), Error=()>.
85    let create_group = RedisStream::connect(&redis_address)
86        .and_then(move |con|
87            // Create group if the one does not exists yet
88            con.touch_group(touch_options))
89        .then(|_| -> RedisResult<()> { Ok(()) });
90
91    // Start the consuming after the group has been checked.
92    //
93    // Note nothing will happen if the previous future has failed.
94    // The consumer variable in a result is the Future<Item=(), Error=()>.
95    // The Item and Error are required by tokio::run().
96    let consumer = create_group
97        .and_then(move |_| {
98            // Create two connections to the Redis Server:
99            // first will be used for managing (send Acknowledge request),
100            // second will be used for receive entries from Redis Server.
101            let manager = RedisStream::connect(&redis_address);
102            let consumer = RedisStream::connect(&redis_address);
103            consumer.join(manager)
104        })
105        .and_then(move |(connection, manager)| {
106            // Create an unbounded channel to allow the consumer notifies the manager
107            // about received and unacknowledged yet entries.
108            let (tx, rx) = unbounded::<EntryId>();
109
110            // Copy of stream_name and group_name to move it into ack_entry future.
111            let stream = stream_name.clone();
112            let group = group_name.clone();
113
114            let ack_entry = rx
115                .map_err(|_|
116                    RedisError::new(RedisErrorKind::InternalError,
117                                    "Something went wrong with UnboundedChannel".to_string()))
118                // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119                .fold(manager, move |manager, id_to_ack|
120                    ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121                .map(|_| ())
122                .map_err(|_| ());
123
124            // Spawn the ack_entry future to be handled separately from the process_entry future.
125            tokio::spawn(ack_entry);
126
127            let group = RedisGroup::new(group_name, consumer_name);
128            let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130            // Subscribe to a Redis stream, processes any incoming entries and sends
131            // entry ids of success processed entries to the manager via the channel sender (tx).
132            let process_entry =
133                connection.subscribe(options)
134                    .and_then(move |subscribe|
135                        subscribe.for_each(move |entries|
136                            process_stream_entries(tx.clone(), entries)));
137            // Return and run later the process_entry future.
138            process_entry
139        })
140        .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142    tokio::run(consumer);
143}
144
145fn read_stdin() -> String {
146    let mut value = String::new();
147    io::stdin().read_line(&mut value).expect("Expect a valid string");
148    if value.ends_with("\n") {
149        value.pop().expect("Expect no empty string");
150    }
151
152    assert!(!value.is_empty(), "Expect no empty string");
153    value
154}
155
156fn ack_stream_entry(manager: RedisStream, stream: String, group: String, id_to_ack: EntryId)
157                    -> impl Future<Item=RedisStream, Error=RedisError> {
158    let options = AckOptions::new(stream.clone(), group.clone(), id_to_ack.clone());
159
160    manager.ack_entry(options)
161        .map(move |(manager, response)| {
162            match response {
163                AckResponse::Ok => println!("{:?} is acknowledged", id_to_ack.to_string()),
164                AckResponse::NotExists =>
165                    eprintln!("Couldn't acknowledge {:?}", id_to_ack.to_string())
166            };
167            manager
168        })
169}
170
171fn process_stream_entries(acknowledger: UnboundedSender<EntryId>, entries: Vec<StreamEntry>)
172                          -> RedisResult<()> {
173    entries.into_iter()
174        .for_each(move |entry| {
175            let message =
176                Message::from_redis_stream_entry(&entry.values);
177            match message {
178                Ok(message) =>
179                    println!("Received message(ID={:?}): {:?}", entry.id.to_string(), message),
180                Err(err) => {
181                    eprintln!("{}", err);
182                    // do not acknowledge the message
183                    return;
184                }
185            }
186
187            // Notifies the manager about the received and processed entry.
188            let future = acknowledger.clone()
189                .send(entry.id)
190                .map(|_| ())
191                .map_err(|_| ());
192            tokio::spawn(future);
193        });
194    Ok(())
195}