redis_asio/stream/
consume.rs1use crate::{RedisResult, RedisValue, RedisError, RedisErrorKind, RespInternalValue,
2 RedisCommand, command};
3use super::{EntryId, RangeType, StreamEntry, parse_stream_entries};
4use futures::{Stream, Future, Sink};
5use futures::sync::mpsc::{channel, Sender, Receiver};
6use futures::Async;
7
8#[derive(Clone)]
10pub struct SubscribeOptions {
11 pub(crate) streams: Vec<String>,
13 pub(crate) group: Option<RedisGroup>,
15}
16
17#[derive(Clone)]
19pub struct ReadExplicitOptions {
20 pub(crate) streams: Vec<(String, EntryId)>,
22 pub(crate) count: u16,
24}
25
26#[derive(Clone)]
28pub struct RangeOptions {
29 pub(crate) stream: String,
31 pub(crate) count: u16,
33 pub(crate) range: RangeType,
35}
36
37#[derive(Clone)]
39pub struct RedisGroup {
40 pub(crate) group: String,
42 pub(crate) consumer: String,
44}
45
46pub struct Subscribe {
48 pub(crate) stream: Box<dyn Stream<Item=RedisValue, Error=RedisError> + Send + 'static>,
49}
50
51impl Stream for Subscribe {
52 type Item = Vec<StreamEntry>;
53 type Error = RedisError;
54
55 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
56 self.stream.poll()
57 .and_then(|value| {
58 let value = match value {
59 Async::Ready(x) => x,
60 _ => return Ok(Async::NotReady)
61 };
62 let value = match value {
63 Some(x) => x,
64 _ => return Ok(Async::Ready(None)),
65 };
66
67 parse_stream_entries(value)
68 .map(|stream_entries| Async::Ready(Some(stream_entries)))
69 })
70 }
71}
72
73impl SubscribeOptions {
74 pub fn new(stream: Vec<String>) -> SubscribeOptions {
75 let group: Option<RedisGroup> = None;
76 SubscribeOptions { streams: stream, group }
77 }
78
79 pub fn with_group(stream: Vec<String>, group: RedisGroup) -> SubscribeOptions {
80 let group = Some(group);
81 SubscribeOptions { streams: stream, group }
82 }
83}
84
85impl ReadExplicitOptions {
86 pub fn new(stream: String, start_id: EntryId, count: u16) -> ReadExplicitOptions {
87 let streams = vec![(stream, start_id)];
88 ReadExplicitOptions { streams, count }
89 }
90
91 pub fn add_stream(&mut self, stream: String, start_id: EntryId) {
92 self.streams.push((stream, start_id))
93 }
94}
95
96impl RangeOptions {
97 pub fn new(stream: String, count: u16, range: RangeType) -> RedisResult<RangeOptions> {
98 if !range.is_valid() {
99 return Err(
100 RedisError::new(RedisErrorKind::InvalidOptions,
101 format!("Left bound should be less than right bound")));
102 }
103
104 Ok(RangeOptions { stream, count, range })
105 }
106}
107
108impl RedisGroup {
109 pub fn new(group: String, consumer: String) -> RedisGroup {
110 RedisGroup { group, consumer }
111 }
112}
113
114enum StreamInternalCommand {
115 ListenNextMessage,
116}
117
118pub(crate) fn subscribe<F, T>(from_srv: F, to_srv: T, options: SubscribeOptions)
119 -> impl Stream<Item=RedisValue, Error=RedisError> + Send + 'static
120 where F: Stream<Item=RespInternalValue, Error=RedisError> + Send + 'static,
121 T: Sink<SinkItem=RedisCommand, SinkError=RedisError> + Send + 'static {
122 const BUFFER_SIZE: usize = 1;
125 let (tx, rx) =
126 channel::<StreamInternalCommand>(BUFFER_SIZE);
127
128 let output = fwd_from_channel_to_srv(to_srv, rx, options);
129 let input
130 = process_from_srv_and_notify_channel(from_srv, tx);
131
132 let output = output.map(|_| None);
142 let input = input.map(|x| Some(x));
143
144 input.select(output.into_stream()).filter_map(|x| x)
145}
146
147pub(crate) fn subscribe_cmd(options: SubscribeOptions) -> RedisCommand
148{
149 let SubscribeOptions { streams, group } = options;
150
151 let id_specifier = match &group {
153 Some(_) => ">",
154 _ => "$"
155 };
156
157 let mut cmd =
158 match &group {
159 Some(_) => command("XREADGROUP"),
160 _ => command("XREAD"),
161 };
162
163 if let Some(RedisGroup { group, consumer }) = group {
164 cmd.arg_mut("GROUP");
165 cmd.arg_mut(group.as_str());
166 cmd.arg_mut(consumer.as_str());
167 }
168
169 let mut cmd =
170 cmd.arg("BLOCK")
171 .arg("0") .arg("STREAMS");
173
174 let mut ids_cmd = RedisCommand::new();
175 for stream in streams.into_iter() {
176 cmd.arg_mut(stream);
177 ids_cmd.arg_mut(id_specifier);
178 }
179
180 cmd.append(ids_cmd);
181 cmd
182}
183
184pub(crate) fn read_explicit_cmd(options: ReadExplicitOptions) -> RedisCommand
185{
186 let ReadExplicitOptions { streams, count } = options;
187
188 let mut cmd =
189 command("XREAD")
190 .arg("COUNT")
191 .arg(count as i64)
192 .arg("STREAMS");
193 let mut ids_cmd = RedisCommand::new();
194 for (stream, start_id) in streams.into_iter() {
195 cmd.arg_mut(stream);
196 ids_cmd.arg_mut(start_id.to_string());
197 }
198
199 cmd.append(ids_cmd);
200 cmd
201}
202
203pub(crate) fn range_cmd(options: RangeOptions) -> RedisCommand
204{
205 let RangeOptions { stream, count, range } = options;
206
207 let (left, right) = range.to_left_right();
208
209 command("XRANGE")
210 .arg(stream)
211 .arg(left)
212 .arg(right)
213 .arg("COUNT")
214 .arg(count as i64)
215}
216
217fn fwd_from_channel_to_srv<T>(to_srv: T,
218 rx: Receiver<StreamInternalCommand>,
219 options: SubscribeOptions)
220 -> impl Future<Item=(), Error=RedisError> + Send + 'static
221 where T: Sink<SinkItem=RedisCommand, SinkError=RedisError> + Send + 'static {
222 rx
223 .map_err(|_| RedisError::new(RedisErrorKind::InternalError,
224 "Cannot read from internal channel".to_string()))
225 .fold(to_srv, move |to_srv, msg| {
226 match msg {
227 StreamInternalCommand::ListenNextMessage =>
228 to_srv.send(subscribe_cmd(options.clone()))
229 }
230 })
231 .map(|_| ())
232}
233
234fn process_from_srv_and_notify_channel<F>(from_srv: F,
235 tx: Sender<StreamInternalCommand>)
236 -> impl Stream<Item=RedisValue, Error=RedisError> + Send + 'static
237 where F: Stream<Item=RespInternalValue, Error=RedisError> + Send + 'static
238{
239 from_srv
240 .and_then(move |msg| {
241 tx.clone().send(StreamInternalCommand::ListenNextMessage)
242 .then(|res| {
243 match res {
244 Ok(_) => (),
245 Err(err) =>
246 return Err(RedisError::new(RedisErrorKind::ConnectionError,
247 format!("Could not send listen request: {:?}", err)))
248 }
249 RedisValue::from_resp_value(msg)
253 })
254 })
255}