redis_asio/stream/
consume.rs

1use crate::{RedisResult, RedisValue, RedisError, RedisErrorKind, RespInternalValue,
2            RedisCommand, command};
3use super::{EntryId, RangeType, StreamEntry, parse_stream_entries};
4use futures::{Stream, Future, Sink};
5use futures::sync::mpsc::{channel, Sender, Receiver};
6use futures::Async;
7
8/// Set of options that are required by `RedisStream::subscribe()`
9#[derive(Clone)]
10pub struct SubscribeOptions {
11    /// List of listen streams
12    pub(crate) streams: Vec<String>,
13    /// Optional group info
14    pub(crate) group: Option<RedisGroup>,
15}
16
17/// Set of options that are required by `RedisStream::read_explicit()`
18#[derive(Clone)]
19pub struct ReadExplicitOptions {
20    /// Get entries from the following streams with ID greater than the corresponding entry IDs
21    pub(crate) streams: Vec<(String, EntryId)>,
22    /// Max count of entries
23    pub(crate) count: u16,
24}
25
26/// Set of options that are required by `RedisStream::range()`
27#[derive(Clone)]
28pub struct RangeOptions {
29    /// Stream name
30    pub(crate) stream: String,
31    /// Max count of entries
32    pub(crate) count: u16,
33    /// Get entries with ID in the range
34    pub(crate) range: RangeType,
35}
36
37/// Pair of group name and consumer name
38#[derive(Clone)]
39pub struct RedisGroup {
40    /// Group name
41    pub(crate) group: String,
42    /// Consumer name
43    pub(crate) consumer: String,
44}
45
46/// The `Stream<Item=Vec<StreamEntry>, Error=RedisError>` wrapper
47pub struct Subscribe {
48    pub(crate) stream: Box<dyn Stream<Item=RedisValue, Error=RedisError> + Send + 'static>,
49}
50
51impl Stream for Subscribe {
52    type Item = Vec<StreamEntry>;
53    type Error = RedisError;
54
55    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
56        self.stream.poll()
57            .and_then(|value| {
58                let value = match value {
59                    Async::Ready(x) => x,
60                    _ => return Ok(Async::NotReady)
61                };
62                let value = match value {
63                    Some(x) => x,
64                    _ => return Ok(Async::Ready(None)),
65                };
66
67                parse_stream_entries(value)
68                    .map(|stream_entries| Async::Ready(Some(stream_entries)))
69            })
70    }
71}
72
73impl SubscribeOptions {
74    pub fn new(stream: Vec<String>) -> SubscribeOptions {
75        let group: Option<RedisGroup> = None;
76        SubscribeOptions { streams: stream, group }
77    }
78
79    pub fn with_group(stream: Vec<String>, group: RedisGroup) -> SubscribeOptions {
80        let group = Some(group);
81        SubscribeOptions { streams: stream, group }
82    }
83}
84
85impl ReadExplicitOptions {
86    pub fn new(stream: String, start_id: EntryId, count: u16) -> ReadExplicitOptions {
87        let streams = vec![(stream, start_id)];
88        ReadExplicitOptions { streams, count }
89    }
90
91    pub fn add_stream(&mut self, stream: String, start_id: EntryId) {
92        self.streams.push((stream, start_id))
93    }
94}
95
96impl RangeOptions {
97    pub fn new(stream: String, count: u16, range: RangeType) -> RedisResult<RangeOptions> {
98        if !range.is_valid() {
99            return Err(
100                RedisError::new(RedisErrorKind::InvalidOptions,
101                                format!("Left bound should be less than right bound")));
102        }
103
104        Ok(RangeOptions { stream, count, range })
105    }
106}
107
108impl RedisGroup {
109    pub fn new(group: String, consumer: String) -> RedisGroup {
110        RedisGroup { group, consumer }
111    }
112}
113
114enum StreamInternalCommand {
115    ListenNextMessage,
116}
117
118pub(crate) fn subscribe<F, T>(from_srv: F, to_srv: T, options: SubscribeOptions)
119                              -> impl Stream<Item=RedisValue, Error=RedisError> + Send + 'static
120    where F: Stream<Item=RespInternalValue, Error=RedisError> + Send + 'static,
121          T: Sink<SinkItem=RedisCommand, SinkError=RedisError> + Send + 'static {
122    // Redis Streams protocol is a simple request-response protocol,
123    // and we should not receive more than one packet before the rx Receiver<StreamInternalCommand>
124    const BUFFER_SIZE: usize = 1;
125    let (tx, rx) =
126        channel::<StreamInternalCommand>(BUFFER_SIZE);
127
128    let output = fwd_from_channel_to_srv(to_srv, rx, options);
129    let input
130        = process_from_srv_and_notify_channel(from_srv, tx);
131
132    // We have the following conditions:
133    // 1) a return stream should include both output future and input stream
134    // 2) select() method requires equal types of Item within two merging streams,
135    // 3) a return stream should has Item = RedisValue
136    // 4) output future should not influence a return stream
137    //
138    // change Item to Option<RedisValue> within the input stream and output future
139    // where output future will not influence a selected stream (via filter_map())
140
141    let output = output.map(|_| None);
142    let input = input.map(|x| Some(x));
143
144    input.select(output.into_stream()).filter_map(|x| x)
145}
146
147pub(crate) fn subscribe_cmd(options: SubscribeOptions) -> RedisCommand
148{
149    let SubscribeOptions { streams, group } = options;
150
151    // receive only new messages (specifier is different for XREAD and XREADGROUP)
152    let id_specifier = match &group {
153        Some(_) => ">",
154        _ => "$"
155    };
156
157    let mut cmd =
158        match &group {
159            Some(_) => command("XREADGROUP"),
160            _ => command("XREAD"),
161        };
162
163    if let Some(RedisGroup { group, consumer }) = group {
164        cmd.arg_mut("GROUP");
165        cmd.arg_mut(group.as_str());
166        cmd.arg_mut(consumer.as_str());
167    }
168
169    let mut cmd =
170        cmd.arg("BLOCK")
171            .arg("0") // block until next pkt
172            .arg("STREAMS");
173
174    let mut ids_cmd = RedisCommand::new();
175    for stream in streams.into_iter() {
176        cmd.arg_mut(stream);
177        ids_cmd.arg_mut(id_specifier);
178    }
179
180    cmd.append(ids_cmd);
181    cmd
182}
183
184pub(crate) fn read_explicit_cmd(options: ReadExplicitOptions) -> RedisCommand
185{
186    let ReadExplicitOptions { streams, count } = options;
187
188    let mut cmd =
189        command("XREAD")
190            .arg("COUNT")
191            .arg(count as i64)
192            .arg("STREAMS");
193    let mut ids_cmd = RedisCommand::new();
194    for (stream, start_id) in streams.into_iter() {
195        cmd.arg_mut(stream);
196        ids_cmd.arg_mut(start_id.to_string());
197    }
198
199    cmd.append(ids_cmd);
200    cmd
201}
202
203pub(crate) fn range_cmd(options: RangeOptions) -> RedisCommand
204{
205    let RangeOptions { stream, count, range } = options;
206
207    let (left, right) = range.to_left_right();
208
209    command("XRANGE")
210        .arg(stream)
211        .arg(left)
212        .arg(right)
213        .arg("COUNT")
214        .arg(count as i64)
215}
216
217fn fwd_from_channel_to_srv<T>(to_srv: T,
218                              rx: Receiver<StreamInternalCommand>,
219                              options: SubscribeOptions)
220                              -> impl Future<Item=(), Error=RedisError> + Send + 'static
221    where T: Sink<SinkItem=RedisCommand, SinkError=RedisError> + Send + 'static {
222    rx
223        .map_err(|_| RedisError::new(RedisErrorKind::InternalError,
224                                     "Cannot read from internal channel".to_string()))
225        .fold(to_srv, move |to_srv, msg| {
226            match msg {
227                StreamInternalCommand::ListenNextMessage =>
228                    to_srv.send(subscribe_cmd(options.clone()))
229            }
230        })
231        .map(|_| ())
232}
233
234fn process_from_srv_and_notify_channel<F>(from_srv: F,
235                                          tx: Sender<StreamInternalCommand>)
236                                          -> impl Stream<Item=RedisValue, Error=RedisError> + Send + 'static
237    where F: Stream<Item=RespInternalValue, Error=RedisError> + Send + 'static
238{
239    from_srv
240        .and_then(move |msg| {
241            tx.clone().send(StreamInternalCommand::ListenNextMessage)
242                .then(|res| {
243                    match res {
244                        Ok(_) => (),
245                        Err(err) =>
246                            return Err(RedisError::new(RedisErrorKind::ConnectionError,
247                                                       format!("Could not send listen request: {:?}", err)))
248                    }
249                    // convert RespInternalValue to RedisValue
250                    // note: the function returns an error if the Resp value is Error
251                    //       else returns RedisValue
252                    RedisValue::from_resp_value(msg)
253                })
254        })
255}