stream_consumer/
stream_consumer.rs1extern crate tokio;
2extern crate futures;
3
4use std::env;
5use std::io;
6use std::net::SocketAddr;
7use std::collections::HashMap;
8use futures::{Future, Stream, Sink};
9use futures::sync::mpsc::{UnboundedSender, unbounded};
10
11use redis_asio::{RedisResult, RedisValue, RedisError, RedisErrorKind, FromRedisValue, from_redis_value};
12use redis_asio::stream::{RedisStream, StreamEntry, EntryId, AckResponse, SubscribeOptions,
13 RedisGroup, TouchGroupOptions, AckOptions};
14
15#[derive(Debug)]
16struct Message(String);
17
18impl FromRedisValue for Message {
21 fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
22 match value {
23 RedisValue::BulkString(data) => {
24 let string = String::from_utf8(data.clone())
25 .map_err(|err|
26 RedisError::new(RedisErrorKind::ParseError,
27 format!("Could not parse message: {}", err))
28 )?;
29 Ok(Message(string))
31 }
32 _ => Err(RedisError::new(RedisErrorKind::ParseError,
33 format!("Could not parse message from invalid RedisValue {:?}", value)))
34 }
35 }
36}
37
38impl Message {
39 fn from_redis_stream_entry(key_values: &HashMap<String, RedisValue>) -> RedisResult<Self> {
43 if key_values.len() != 2 {
44 return Err(RedisError::new(RedisErrorKind::ParseError,
45 "Invalid packet".to_string()));
46 }
47
48 let get_value =
49 |key: &str| match key_values.get(key) {
50 Some(x) => Ok(x),
51 _ => Err(RedisError::new(RedisErrorKind::ParseError,
52 "Invalid packet".to_string()))
53 };
54
55 let packet_type: String = from_redis_value(get_value("type")?)?;
56 match packet_type.as_str() {
57 "Message" => {
58 let data: Message = from_redis_value(get_value("data")?)?;
59 Ok(data)
60 }
61 _ => Err(RedisError::new(RedisErrorKind::ParseError,
62 "Unknown message type".to_string()))
63 }
64 }
65}
66
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 let consumer = create_group
97 .and_then(move |_| {
98 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 let (tx, rx) = unbounded::<EntryId>();
109
110 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}
144
145fn read_stdin() -> String {
146 let mut value = String::new();
147 io::stdin().read_line(&mut value).expect("Expect a valid string");
148 if value.ends_with("\n") {
149 value.pop().expect("Expect no empty string");
150 }
151
152 assert!(!value.is_empty(), "Expect no empty string");
153 value
154}
155
156fn ack_stream_entry(manager: RedisStream, stream: String, group: String, id_to_ack: EntryId)
157 -> impl Future<Item=RedisStream, Error=RedisError> {
158 let options = AckOptions::new(stream.clone(), group.clone(), id_to_ack.clone());
159
160 manager.ack_entry(options)
161 .map(move |(manager, response)| {
162 match response {
163 AckResponse::Ok => println!("{:?} is acknowledged", id_to_ack.to_string()),
164 AckResponse::NotExists =>
165 eprintln!("Couldn't acknowledge {:?}", id_to_ack.to_string())
166 };
167 manager
168 })
169}
170
171fn process_stream_entries(acknowledger: UnboundedSender<EntryId>, entries: Vec<StreamEntry>)
172 -> RedisResult<()> {
173 entries.into_iter()
174 .for_each(move |entry| {
175 let message =
176 Message::from_redis_stream_entry(&entry.values);
177 match message {
178 Ok(message) =>
179 println!("Received message(ID={:?}): {:?}", entry.id.to_string(), message),
180 Err(err) => {
181 eprintln!("{}", err);
182 return;
184 }
185 }
186
187 let future = acknowledger.clone()
189 .send(entry.id)
190 .map(|_| ())
191 .map_err(|_| ());
192 tokio::spawn(future);
193 });
194 Ok(())
195}