redis_asio/stream/
stream.rs

1use crate::{RedisValue, RedisCoreConnection, RedisError, RedisErrorKind,
2            IntoRedisArgument, from_redis_value};
3use super::*;
4
5use std::error::Error;
6use std::net::SocketAddr;
7use std::collections::HashMap;
8use futures::{Future, Sink};
9
10
11/// The structure represents a Redis connection that provides interface for
12/// working with Redis Stream "https://redis.io/topics/streams-intro".
13///
14/// The structure wraps an actual `RedisCoreConnection`,
15/// converts RedisValue into and from considered structures that are easier
16/// to use in Redis Stream context.
17///
18/// See more examples in `examples` directory.
19pub struct RedisStream {
20    connection: RedisCoreConnection,
21}
22
23impl RedisStream {
24    /// Open a connection to Redis server and wrap it into `RedisStream`,
25    /// that will be available in the future.
26    pub fn connect(addr: &SocketAddr)
27                   -> impl Future<Item=RedisStream, Error=RedisError> + Send + 'static {
28        RedisCoreConnection::connect(addr)
29            .map(|connection| Self { connection })
30    }
31
32    /// Send an entry that will be constructed by options and pairs of key-values.
33    ///
34    /// # Example
35    ///
36    /// ```rust,no_run
37    /// use std::net::SocketAddr;
38    /// use std::collections::HashMap;
39    /// use futures::Future;
40    /// use redis_asio::{RedisArgument, IntoRedisArgument};
41    /// use redis_asio::stream::{RedisStream, SendEntryOptions, EntryId};
42    ///
43    /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
44    /// let send_options = SendEntryOptions::new("mystream".to_string());
45    ///
46    /// let mut request: HashMap<String, RedisArgument> = HashMap::new();
47    /// request.insert("type".to_string(), 3i32.into_redis_argument());
48    /// request.insert("data".to_string(), "Hello, world!".into_redis_argument());
49    ///
50    /// let future = RedisStream::connect(address)
51    ///     .and_then(move |stream: RedisStream| {
52    ///         // HashMap<String, RedisArgument> satisfies the
53    ///         // HashMap<String, ToRedisArgument>
54    ///         stream.send_entry(send_options, request)
55    ///     })
56    ///     .map(|(_, inserted_entry_id): (RedisStream, EntryId)| {
57    ///         println!("{:?} has sent", inserted_entry_id.to_string());
58    ///     })
59    ///     .map_err(|err| eprintln!("something went wrong: {}", err));
60    /// tokio::run(future);
61    /// ```
62    pub fn send_entry<T>(self, options: SendEntryOptions, key_values: HashMap<String, T>)
63                         -> impl Future<Item=(RedisStream, EntryId), Error=RedisError> + Send + 'static
64        where T: IntoRedisArgument {
65        self.connection.send(add_command(options, key_values))
66            .and_then(|(connection, response)| {
67                let entry_id_string = from_redis_value(&response)?;
68                let entry_id = EntryId::from_string(entry_id_string)?;
69                Ok((Self { connection }, entry_id))
70            })
71    }
72
73    /// Read entries with IDs greater than specified `start_id`.
74    ///
75    /// # Example
76    ///
77    /// ```rust,no_run
78    /// use std::net::SocketAddr;
79    /// use std::collections::HashMap;
80    /// use futures::Future;
81    /// use redis_asio::stream::{RedisStream, ReadExplicitOptions, EntryId,
82    ///                          StreamEntry};
83    ///
84    /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
85    /// // start_id = "0-0" means get any entries
86    /// let mut read_options =
87    ///     ReadExplicitOptions::new("stream1".to_string(), EntryId::new(0, 0), 10);
88    /// read_options.add_stream("stream2".to_string(), EntryId::new(0, 0));
89    ///
90    /// let future = RedisStream::connect(address)
91    ///     .and_then(move |stream: RedisStream| {
92    ///         stream.read_explicit(read_options)
93    ///     })
94    ///     .map(|(_, entries): (RedisStream, Vec<StreamEntry>)| {
95    ///         for entry in entries.into_iter() {
96    ///             println!("Received: {:?}", entry);
97    ///         }
98    ///     })
99    ///     .map_err(|err| eprintln!("something went wrong: {}", err));
100    /// tokio::run(future);
101    /// ```
102    pub fn read_explicit(self, options: ReadExplicitOptions)
103                         -> impl Future<
104                             Item=(RedisStream, Vec<StreamEntry>),
105                             Error=RedisError>
106                         + Send + 'static {
107        self.connection.send(read_explicit_cmd(options))
108            .and_then(|(connection, response)|
109                Ok((RedisStream { connection }, parse_stream_entries(response)?))
110            )
111    }
112
113    /// Get entries in specified range.
114    ///
115    /// # Example
116    ///
117    /// ```rust,no_run
118    /// use std::net::SocketAddr;
119    /// use futures::Future;
120    /// use redis_asio::stream::{RedisStream, RangeOptions, RangeType, RangeEntry};
121    ///
122    /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
123    /// let range_options =
124    ///     RangeOptions::new("stream1".to_string(), 10, RangeType::Any).unwrap();
125    ///
126    /// let future = RedisStream::connect(address)
127    ///     .and_then(move |stream: RedisStream| {
128    ///         stream.range(range_options)
129    ///     })
130    ///     .map(|(_, entries): (RedisStream, Vec<RangeEntry>)| {
131    ///         for entry in entries.into_iter() {
132    ///             println!("Received: {:?}", entry);
133    ///         }
134    ///     })
135    ///     .map_err(|err| eprintln!("something went wrong: {}", err));
136    /// tokio::run(future);
137    /// ```
138    pub fn range(self, options: RangeOptions)
139                 -> impl Future<
140                     Item=(RedisStream, Vec<RangeEntry>),
141                     Error=RedisError>
142                 + Send + 'static {
143        self.connection.send(range_cmd(options))
144            .and_then(|(connection, response)|
145                Ok((RedisStream { connection }, parse_range_entries(response)?))
146            )
147    }
148
149    /// Subscribe to a Redis stream and process all incoming entries.
150    /// Redis Streams requires to send XREAD/XREADGROUP requests every time
151    /// the client receives a response on the previous,
152    /// in other words Redis Streams does not provide an interface to subscribe
153    /// to a Redis stream.
154    ///
155    /// In the Crate the subscription is possible by hidden requests sending
156    /// within the Crate engine.
157    ///
158    /// Request that will be sent to get new entries in the following example:
159    /// "XREADGROUP GROUP mygroup Bob BLOCK 0 STREAMS mystream <"
160    ///
161    /// # Example
162    ///
163    /// ```rust,no_run
164    /// use std::net::SocketAddr;
165    /// use futures::{Future, Stream};
166    /// use redis_asio::stream::{RedisStream, SubscribeOptions, StreamEntry,
167    ///                          RedisGroup};
168    ///
169    /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
170    /// let group_info = RedisGroup::new("mygroup".to_string(), "Bob".to_string());
171    /// let subscribe_options =
172    ///     SubscribeOptions::with_group(vec!["stream1".to_string()], group_info);
173    ///
174    /// let future = RedisStream::connect(address)
175    ///     .and_then(move |stream: RedisStream| {
176    ///         stream.subscribe(subscribe_options)
177    ///     })
178    ///     .and_then(|subscribe| /*:Subscribe*/ {
179    ///         subscribe.for_each(|entries: Vec<StreamEntry>| {
180    ///             for entry in entries.into_iter() {
181    ///                 println!("Received: {:?}", entry);
182    ///             }
183    ///             Ok(())
184    ///         })
185    ///     })
186    ///     .map_err(|err| eprintln!("something went wrong: {}", err));
187    /// tokio::run(future);
188    /// ```
189    pub fn subscribe(self, options: SubscribeOptions)
190                     -> impl Future<Item=Subscribe, Error=RedisError> + Send + 'static {
191        let RedisCoreConnection { sender, receiver } = self.connection;
192
193        // send first subscription request
194        sender
195            .send(subscribe_cmd(options.clone()))
196            .map(move |sender| {
197                // run recursive server message processing
198                Subscribe {
199                    stream: Box::new(subscribe(receiver, sender, options))
200                }
201            })
202    }
203
204    /// Acknowledge an entry by its ID.
205    ///
206    /// # Example
207    /// ```rust,no_run
208    /// use std::net::SocketAddr;
209    /// use futures::{Future, Stream};
210    /// use redis_asio::stream::{RedisStream, AckOptions, AckResponse, EntryId};
211    ///
212    /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
213    ///    let ack_options =
214    ///        AckOptions::new(
215    ///            "mystream".to_string(),
216    ///            "mygroup".to_string(),
217    ///            EntryId::new(0, 0));
218    ///
219    ///    let future = RedisStream::connect(address)
220    ///        .and_then(move |stream: RedisStream| {
221    ///            stream.ack_entry(ack_options)
222    ///        })
223    ///        .map(|(_, response): (RedisStream, AckResponse)| {
224    ///            assert_eq!(AckResponse::Ok, response);
225    ///        })
226    ///        .map_err(|err| eprintln!("something went wrong: {}", err));
227    ///    tokio::run(future);
228    /// ```
229    pub fn ack_entry(self, options: AckOptions)
230                     -> impl Future<Item=(Self, AckResponse), Error=RedisError> + Send + 'static {
231        self.connection.send(ack_entry_command(options))
232            .and_then(|(connection, response)| {
233                let response = match response {
234                    RedisValue::Int(x) => AckResponse::new(x),
235                    _ => return Err(RedisError::new(RedisErrorKind::ParseError, "Expect integer reply on XACK request".to_string())),
236                };
237                Ok((RedisStream { connection }, response))
238            })
239    }
240
241    /// Get entries that was not acknowledged but was sent to specified consumer.
242    ///
243    /// # Example
244    /// ```rust,no_run
245    /// use std::net::SocketAddr;
246    /// use futures::Future;
247    /// use redis_asio::stream::{RedisStream, PendingOptions, StreamEntry, EntryId};
248    ///
249    /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
250    ///    let pending_options =
251    ///        PendingOptions::new(
252    ///            "mystream".to_string(),
253    ///            "mygroup".to_string(),
254    ///            "Bob".to_string(),
255    ///            EntryId::new(0, 0)).unwrap();
256    ///
257    ///    let future = RedisStream::connect(address)
258    ///        .and_then(move |stream: RedisStream| {
259    ///            stream.pending_entries(pending_options)
260    ///        })
261    ///        .map(|(_, entries): (RedisStream, Vec<StreamEntry>)| {
262    ///            for entry in entries.into_iter() {
263    ///                println!("Received: {:?}", entry);
264    ///            }
265    ///        })
266    ///        .map_err(|err| eprintln!("something went wrong: {}", err));
267    ///    tokio::run(future);
268    /// ```
269    pub fn pending_entries(self, options: PendingOptions)
270                           -> impl Future<Item=(Self, Vec<StreamEntry>), Error=RedisError> + Send + 'static {
271        self.connection.send(pending_list_command(options))
272            .and_then(|(connection, response)| {
273                Ok((RedisStream { connection }, parse_stream_entries(response)?))
274            })
275    }
276
277    /// Try to create a group. If the group exists already, do not return an error.
278    ///
279    /// # Example
280    /// ```rust,no_run
281    /// use std::net::SocketAddr;
282    /// use futures::Future;
283    /// use redis_asio::stream::{RedisStream, TouchGroupOptions, StreamEntry,
284    ///                          EntryId};
285    ///
286    /// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
287    ///    let touch_options =
288    ///        TouchGroupOptions::new("mystream".to_string(), "mygroup".to_string());
289    ///
290    ///    let future = RedisStream::connect(&address)
291    ///        .and_then(move |con|
292    ///            // create group if the one does not exists yet
293    ///            con.touch_group(touch_options))
294    ///        // ignore an error if the group exists already
295    ///        .then(|_| -> Result<(), ()> { Ok(()) });
296    ///    tokio::run(future);
297    /// ```
298    pub fn touch_group(self, options: TouchGroupOptions)
299                       -> impl Future<Item=(), Error=RedisError> + Send + 'static {
300        self.connection.send(touch_group_command(options))
301            .then(|res| {
302                match res {
303                    // do not keep the connection in anyway because we could receive BUSYGROUP from server
304                    Ok((_connection, _)) => Ok(()),
305                    Err(err) => {
306                        if err.error == RedisErrorKind::ReceiveError
307                            && err.description().contains("BUSYGROUP") {
308                            return Ok(());
309                        }
310                        Err(err)
311                    }
312                }
313            })
314    }
315}