redis_asio/stream/stream.rs
1use crate::{RedisValue, RedisCoreConnection, RedisError, RedisErrorKind,
2 IntoRedisArgument, from_redis_value};
3use super::*;
4
5use std::error::Error;
6use std::net::SocketAddr;
7use std::collections::HashMap;
8use futures::{Future, Sink};
9
10
11/// The structure represents a Redis connection that provides interface for
12/// working with Redis Stream "https://redis.io/topics/streams-intro".
13///
14/// The structure wraps an actual `RedisCoreConnection`,
15/// converts RedisValue into and from considered structures that are easier
16/// to use in Redis Stream context.
17///
18/// See more examples in `examples` directory.
19pub struct RedisStream {
20 connection: RedisCoreConnection,
21}
22
23impl RedisStream {
24 /// Open a connection to Redis server and wrap it into `RedisStream`,
25 /// that will be available in the future.
26 pub fn connect(addr: &SocketAddr)
27 -> impl Future<Item=RedisStream, Error=RedisError> + Send + 'static {
28 RedisCoreConnection::connect(addr)
29 .map(|connection| Self { connection })
30 }
31
32 /// Send an entry that will be constructed by options and pairs of key-values.
33 ///
34 /// # Example
35 ///
36 /// ```rust,no_run
37 /// use std::net::SocketAddr;
38 /// use std::collections::HashMap;
39 /// use futures::Future;
40 /// use redis_asio::{RedisArgument, IntoRedisArgument};
41 /// use redis_asio::stream::{RedisStream, SendEntryOptions, EntryId};
42 ///
43 /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
44 /// let send_options = SendEntryOptions::new("mystream".to_string());
45 ///
46 /// let mut request: HashMap<String, RedisArgument> = HashMap::new();
47 /// request.insert("type".to_string(), 3i32.into_redis_argument());
48 /// request.insert("data".to_string(), "Hello, world!".into_redis_argument());
49 ///
50 /// let future = RedisStream::connect(address)
51 /// .and_then(move |stream: RedisStream| {
52 /// // HashMap<String, RedisArgument> satisfies the
53 /// // HashMap<String, ToRedisArgument>
54 /// stream.send_entry(send_options, request)
55 /// })
56 /// .map(|(_, inserted_entry_id): (RedisStream, EntryId)| {
57 /// println!("{:?} has sent", inserted_entry_id.to_string());
58 /// })
59 /// .map_err(|err| eprintln!("something went wrong: {}", err));
60 /// tokio::run(future);
61 /// ```
62 pub fn send_entry<T>(self, options: SendEntryOptions, key_values: HashMap<String, T>)
63 -> impl Future<Item=(RedisStream, EntryId), Error=RedisError> + Send + 'static
64 where T: IntoRedisArgument {
65 self.connection.send(add_command(options, key_values))
66 .and_then(|(connection, response)| {
67 let entry_id_string = from_redis_value(&response)?;
68 let entry_id = EntryId::from_string(entry_id_string)?;
69 Ok((Self { connection }, entry_id))
70 })
71 }
72
73 /// Read entries with IDs greater than specified `start_id`.
74 ///
75 /// # Example
76 ///
77 /// ```rust,no_run
78 /// use std::net::SocketAddr;
79 /// use std::collections::HashMap;
80 /// use futures::Future;
81 /// use redis_asio::stream::{RedisStream, ReadExplicitOptions, EntryId,
82 /// StreamEntry};
83 ///
84 /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
85 /// // start_id = "0-0" means get any entries
86 /// let mut read_options =
87 /// ReadExplicitOptions::new("stream1".to_string(), EntryId::new(0, 0), 10);
88 /// read_options.add_stream("stream2".to_string(), EntryId::new(0, 0));
89 ///
90 /// let future = RedisStream::connect(address)
91 /// .and_then(move |stream: RedisStream| {
92 /// stream.read_explicit(read_options)
93 /// })
94 /// .map(|(_, entries): (RedisStream, Vec<StreamEntry>)| {
95 /// for entry in entries.into_iter() {
96 /// println!("Received: {:?}", entry);
97 /// }
98 /// })
99 /// .map_err(|err| eprintln!("something went wrong: {}", err));
100 /// tokio::run(future);
101 /// ```
102 pub fn read_explicit(self, options: ReadExplicitOptions)
103 -> impl Future<
104 Item=(RedisStream, Vec<StreamEntry>),
105 Error=RedisError>
106 + Send + 'static {
107 self.connection.send(read_explicit_cmd(options))
108 .and_then(|(connection, response)|
109 Ok((RedisStream { connection }, parse_stream_entries(response)?))
110 )
111 }
112
113 /// Get entries in specified range.
114 ///
115 /// # Example
116 ///
117 /// ```rust,no_run
118 /// use std::net::SocketAddr;
119 /// use futures::Future;
120 /// use redis_asio::stream::{RedisStream, RangeOptions, RangeType, RangeEntry};
121 ///
122 /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
123 /// let range_options =
124 /// RangeOptions::new("stream1".to_string(), 10, RangeType::Any).unwrap();
125 ///
126 /// let future = RedisStream::connect(address)
127 /// .and_then(move |stream: RedisStream| {
128 /// stream.range(range_options)
129 /// })
130 /// .map(|(_, entries): (RedisStream, Vec<RangeEntry>)| {
131 /// for entry in entries.into_iter() {
132 /// println!("Received: {:?}", entry);
133 /// }
134 /// })
135 /// .map_err(|err| eprintln!("something went wrong: {}", err));
136 /// tokio::run(future);
137 /// ```
138 pub fn range(self, options: RangeOptions)
139 -> impl Future<
140 Item=(RedisStream, Vec<RangeEntry>),
141 Error=RedisError>
142 + Send + 'static {
143 self.connection.send(range_cmd(options))
144 .and_then(|(connection, response)|
145 Ok((RedisStream { connection }, parse_range_entries(response)?))
146 )
147 }
148
149 /// Subscribe to a Redis stream and process all incoming entries.
150 /// Redis Streams requires to send XREAD/XREADGROUP requests every time
151 /// the client receives a response on the previous,
152 /// in other words Redis Streams does not provide an interface to subscribe
153 /// to a Redis stream.
154 ///
155 /// In the Crate the subscription is possible by hidden requests sending
156 /// within the Crate engine.
157 ///
158 /// Request that will be sent to get new entries in the following example:
159 /// "XREADGROUP GROUP mygroup Bob BLOCK 0 STREAMS mystream <"
160 ///
161 /// # Example
162 ///
163 /// ```rust,no_run
164 /// use std::net::SocketAddr;
165 /// use futures::{Future, Stream};
166 /// use redis_asio::stream::{RedisStream, SubscribeOptions, StreamEntry,
167 /// RedisGroup};
168 ///
169 /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
170 /// let group_info = RedisGroup::new("mygroup".to_string(), "Bob".to_string());
171 /// let subscribe_options =
172 /// SubscribeOptions::with_group(vec!["stream1".to_string()], group_info);
173 ///
174 /// let future = RedisStream::connect(address)
175 /// .and_then(move |stream: RedisStream| {
176 /// stream.subscribe(subscribe_options)
177 /// })
178 /// .and_then(|subscribe| /*:Subscribe*/ {
179 /// subscribe.for_each(|entries: Vec<StreamEntry>| {
180 /// for entry in entries.into_iter() {
181 /// println!("Received: {:?}", entry);
182 /// }
183 /// Ok(())
184 /// })
185 /// })
186 /// .map_err(|err| eprintln!("something went wrong: {}", err));
187 /// tokio::run(future);
188 /// ```
189 pub fn subscribe(self, options: SubscribeOptions)
190 -> impl Future<Item=Subscribe, Error=RedisError> + Send + 'static {
191 let RedisCoreConnection { sender, receiver } = self.connection;
192
193 // send first subscription request
194 sender
195 .send(subscribe_cmd(options.clone()))
196 .map(move |sender| {
197 // run recursive server message processing
198 Subscribe {
199 stream: Box::new(subscribe(receiver, sender, options))
200 }
201 })
202 }
203
204 /// Acknowledge an entry by its ID.
205 ///
206 /// # Example
207 /// ```rust,no_run
208 /// use std::net::SocketAddr;
209 /// use futures::{Future, Stream};
210 /// use redis_asio::stream::{RedisStream, AckOptions, AckResponse, EntryId};
211 ///
212 /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
213 /// let ack_options =
214 /// AckOptions::new(
215 /// "mystream".to_string(),
216 /// "mygroup".to_string(),
217 /// EntryId::new(0, 0));
218 ///
219 /// let future = RedisStream::connect(address)
220 /// .and_then(move |stream: RedisStream| {
221 /// stream.ack_entry(ack_options)
222 /// })
223 /// .map(|(_, response): (RedisStream, AckResponse)| {
224 /// assert_eq!(AckResponse::Ok, response);
225 /// })
226 /// .map_err(|err| eprintln!("something went wrong: {}", err));
227 /// tokio::run(future);
228 /// ```
229 pub fn ack_entry(self, options: AckOptions)
230 -> impl Future<Item=(Self, AckResponse), Error=RedisError> + Send + 'static {
231 self.connection.send(ack_entry_command(options))
232 .and_then(|(connection, response)| {
233 let response = match response {
234 RedisValue::Int(x) => AckResponse::new(x),
235 _ => return Err(RedisError::new(RedisErrorKind::ParseError, "Expect integer reply on XACK request".to_string())),
236 };
237 Ok((RedisStream { connection }, response))
238 })
239 }
240
241 /// Get entries that was not acknowledged but was sent to specified consumer.
242 ///
243 /// # Example
244 /// ```rust,no_run
245 /// use std::net::SocketAddr;
246 /// use futures::Future;
247 /// use redis_asio::stream::{RedisStream, PendingOptions, StreamEntry, EntryId};
248 ///
249 /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
250 /// let pending_options =
251 /// PendingOptions::new(
252 /// "mystream".to_string(),
253 /// "mygroup".to_string(),
254 /// "Bob".to_string(),
255 /// EntryId::new(0, 0)).unwrap();
256 ///
257 /// let future = RedisStream::connect(address)
258 /// .and_then(move |stream: RedisStream| {
259 /// stream.pending_entries(pending_options)
260 /// })
261 /// .map(|(_, entries): (RedisStream, Vec<StreamEntry>)| {
262 /// for entry in entries.into_iter() {
263 /// println!("Received: {:?}", entry);
264 /// }
265 /// })
266 /// .map_err(|err| eprintln!("something went wrong: {}", err));
267 /// tokio::run(future);
268 /// ```
269 pub fn pending_entries(self, options: PendingOptions)
270 -> impl Future<Item=(Self, Vec<StreamEntry>), Error=RedisError> + Send + 'static {
271 self.connection.send(pending_list_command(options))
272 .and_then(|(connection, response)| {
273 Ok((RedisStream { connection }, parse_stream_entries(response)?))
274 })
275 }
276
277 /// Try to create a group. If the group exists already, do not return an error.
278 ///
279 /// # Example
280 /// ```rust,no_run
281 /// use std::net::SocketAddr;
282 /// use futures::Future;
283 /// use redis_asio::stream::{RedisStream, TouchGroupOptions, StreamEntry,
284 /// EntryId};
285 ///
286 /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
287 /// let touch_options =
288 /// TouchGroupOptions::new("mystream".to_string(), "mygroup".to_string());
289 ///
290 /// let future = RedisStream::connect(&address)
291 /// .and_then(move |con|
292 /// // create group if the one does not exists yet
293 /// con.touch_group(touch_options))
294 /// // ignore an error if the group exists already
295 /// .then(|_| -> Result<(), ()> { Ok(()) });
296 /// tokio::run(future);
297 /// ```
298 pub fn touch_group(self, options: TouchGroupOptions)
299 -> impl Future<Item=(), Error=RedisError> + Send + 'static {
300 self.connection.send(touch_group_command(options))
301 .then(|res| {
302 match res {
303 // do not keep the connection in anyway because we could receive BUSYGROUP from server
304 Ok((_connection, _)) => Ok(()),
305 Err(err) => {
306 if err.error == RedisErrorKind::ReceiveError
307 && err.description().contains("BUSYGROUP") {
308 return Ok(());
309 }
310 Err(err)
311 }
312 }
313 })
314 }
315}