redis_asio/stream/
entry.rs

1use crate::{RedisValue, RedisResult, RedisError, RedisErrorKind, FromRedisValue, from_redis_value};
2use std::num::ParseIntError;
3use std::error::Error;
4use std::fmt;
5use std::collections::HashMap;
6
7#[derive(Clone, PartialEq, PartialOrd)]
8pub struct EntryId((u64, u64));
9
10/// Structure that wraps a entry received on XREAD/XREADGROUP request.
11#[derive(Clone, PartialEq)]
12pub struct StreamEntry {
13    /// Stream name
14    pub stream: String,
15    /// Stream entry id is a simple string "milliseconds-id"
16    pub id: EntryId,
17    /// Note Redis allows to use key as a binary Bulk String
18    /// but in the library it is forbidden for easy of use API.
19    /// Value may be any of the RedisValue types
20    pub values: HashMap<String, RedisValue>,
21}
22
23/// Structure that wraps an range entry received on XRANGE request.
24#[derive(Clone, PartialEq)]
25pub struct RangeEntry {
26    /// Stream entry id is a simple string "milliseconds-id"
27    pub id: EntryId,
28    /// Note Redis allows to use key as a binary Bulk String
29    /// but in the library it is forbidden for easy of use API.
30    /// Value may be any of the RedisValue types
31    pub values: HashMap<String, RedisValue>,
32}
33
34impl StreamEntry {
35    pub(crate) fn new(stream: String, id: EntryId, values: HashMap<String, RedisValue>) -> Self {
36        StreamEntry {
37            stream,
38            id,
39            values,
40        }
41    }
42}
43
44impl RangeEntry {
45    pub(crate) fn new(id: EntryId, values: HashMap<String, RedisValue>) -> Self {
46        RangeEntry {
47            id,
48            values,
49        }
50    }
51}
52
53/// Parse XREAD/XREADGROUP result: RedisValue to vec of StreamEntry
54pub(crate) fn parse_stream_entries(value: RedisValue) -> RedisResult<Vec<StreamEntry>> {
55    // usually count of entries within one stream is 1,
56    // because in finally case we subscribe on only new messages
57    const LEN_FACTOR: usize = 1;
58
59    let streams: Vec<StreamInfo> = from_redis_value(&value)?;
60
61    let capacity = streams.len() * LEN_FACTOR;
62    let mut stream_entries: Vec<StreamEntry> = Vec::with_capacity(capacity);
63
64    // transform Vec<EntryInfo> to Vec<StreamEntry>
65    for StreamInfo { id, entries } in streams.into_iter() {
66        for entry in entries.into_iter() {
67            let stream_entry =
68                StreamEntry::new(id.clone(), EntryId::from_string(entry.id)?, entry.key_values);
69
70            stream_entries.push(stream_entry);
71        }
72    }
73
74    Ok(stream_entries)
75}
76
77/// Parse XRANGE result: RedisValue to vec of StreamEntry
78pub fn parse_range_entries(value: RedisValue) -> RedisResult<Vec<RangeEntry>> {
79    let entries: Vec<EntryInfo> = from_redis_value(&value)?;
80
81    let mut result_entries: Vec<RangeEntry> = Vec::with_capacity(entries.len());
82
83    // transform the Vec<EntryInfo> to Vec<RangeEntry>
84    for entry in entries.into_iter() {
85        let entry =
86            RangeEntry::new(EntryId::from_string(entry.id)?, entry.key_values);
87
88        result_entries.push(entry);
89    }
90
91    Ok(result_entries)
92}
93
94/// Internal structure is used to parse RedisValue into StreamEntry
95struct StreamInfo {
96    id: String,
97    entries: Vec<EntryInfo>,
98}
99
100/// Internal structure is used to parse RedisValue into StreamEntry
101#[derive(Debug)]
102struct EntryInfo {
103    id: String,
104    key_values: HashMap<String, RedisValue>,
105}
106
107#[derive(Clone)]
108pub enum RangeType {
109    Any,
110    GreaterThan(EntryId),
111    LessThan(EntryId),
112    GreaterLessThan(EntryId, EntryId),
113}
114
115impl RangeType {
116    /// Check if the left bound is less than the right bound
117    pub fn is_valid(&self) -> bool {
118        match self {
119            RangeType::GreaterLessThan(left, right) => left < right,
120            _ => true
121        }
122    }
123
124    pub(crate) fn to_left_right(&self) -> (String, String) {
125        match self {
126            RangeType::Any => ("-".to_string(), "+".to_string()),
127            RangeType::GreaterThan(left) => (left.to_string(), "+".to_string()),
128            RangeType::LessThan(right) => ("-".to_string(), right.to_string()),
129            RangeType::GreaterLessThan(left, right) => (left.to_string(), right.to_string()),
130        }
131    }
132}
133
134impl fmt::Debug for StreamEntry {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        write!(f, "(stream={}, id=\"{:?}\", {:?})", self.stream, self.id, self.values)?;
137        Ok(())
138    }
139}
140
141impl fmt::Debug for RangeEntry {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        write!(f, "(id=\"{:?}\", {:?})", self.id, self.values)?;
144        Ok(())
145    }
146}
147
148impl fmt::Debug for EntryId {
149    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150        write!(f, "{}", self.to_string())?;
151        Ok(())
152    }
153}
154
155impl EntryId {
156    pub fn new(ms: u64, id: u64) -> EntryId {
157        EntryId((ms, id))
158    }
159
160    // Parse the Redis Stream Entry as pair: <milliseconds, id>
161    pub(crate) fn from_string(id: String) -> RedisResult<EntryId> {
162        const ENTRY_ID_CHUNK_LEN: usize = 2;
163        const ENTRY_ID_MS_POS: usize = 0;
164        const ENTRY_ID_ID_POS: usize = 1;
165
166        let tokens: Vec<&str>
167            = id.split('-').filter(|token| !token.is_empty()).collect();
168        if tokens.len() != ENTRY_ID_CHUNK_LEN {
169            return Err(
170                RedisError::new(
171                    RedisErrorKind::ParseError,
172                    format!("Couldn't parse a Redis entry id: {:?}", &id))
173            );
174        }
175
176        let ms = tokens[ENTRY_ID_MS_POS].parse::<u64>().map_err(&to_redis_error)?;
177        let id = tokens[ENTRY_ID_ID_POS].parse::<u64>().map_err(&to_redis_error)?;
178        Ok(Self((ms, id)))
179    }
180
181    pub fn to_string(&self) -> String {
182        format!("{}-{}", (self.0).0, (self.0).1)
183    }
184}
185
186fn to_redis_error(err: ParseIntError) -> RedisError {
187    RedisError::new(RedisErrorKind::ParseError, err.description().to_string())
188}
189
190impl FromRedisValue for EntryInfo {
191    fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
192        let (id, key_values): (String, HashMap<String, RedisValue>) = from_redis_value(value)?;
193
194        Ok(EntryInfo { id, key_values })
195    }
196}
197
198impl FromRedisValue for StreamInfo {
199    fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
200        let (id, entries): (String, Vec<EntryInfo>) = from_redis_value(value)?;
201        Ok(Self { id, entries })
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn common_test_parse_stream_entry() {
211        let entry1 = RedisValue::Array(vec![
212            RedisValue::BulkString(b"1581870410019-0".to_vec()),
213            RedisValue::Array(vec![
214                RedisValue::BulkString(b"1key1".to_vec()),
215                RedisValue::BulkString(b"1value1".to_vec()),
216                RedisValue::BulkString(b"1key2".to_vec()),
217                RedisValue::Int(2)
218            ])
219        ]);
220
221        let entry2 = RedisValue::Array(vec![
222            RedisValue::BulkString(b"1581870414714-0".to_vec()),
223            RedisValue::Array(vec![
224                RedisValue::BulkString(b"2key1".to_vec()),
225                RedisValue::BulkString(b"2value1".to_vec()),
226                RedisValue::BulkString(b"2key2".to_vec()),
227                RedisValue::BulkString(b"2value2".to_vec()),
228                RedisValue::BulkString(b"2key3".to_vec()),
229                RedisValue::BulkString(b"2value3".to_vec())
230            ])
231        ]);
232
233        let entry3 = RedisValue::Array(vec![
234            RedisValue::BulkString(b"1581855076637-0".to_vec()),
235            RedisValue::Array(vec![
236                RedisValue::BulkString(b"3key1".to_vec()),
237                RedisValue::BulkString(b"3value1".to_vec())
238            ])
239        ]);
240
241        let stream1 = RedisValue::Array(vec![
242            RedisValue::BulkString(b"stream1".to_vec()),
243            RedisValue::Array(vec![
244                entry1,
245                entry2
246            ])
247        ]);
248
249        let stream2 = RedisValue::Array(vec![
250            RedisValue::BulkString(b"stream2".to_vec()),
251            RedisValue::Array(vec![entry3])
252        ]);
253
254        let value = RedisValue::Array(vec![stream1, stream2]);
255
256        let result = parse_stream_entries(value).unwrap();
257
258        let mut entry1: HashMap<String, RedisValue> = HashMap::new();
259        entry1.insert("1key1".to_string(), RedisValue::BulkString(b"1value1".to_vec()));
260        entry1.insert("1key2".to_string(), RedisValue::Int(2));
261
262        let mut entry2: HashMap<String, RedisValue> = HashMap::new();
263        entry2.insert("2key1".to_string(), RedisValue::BulkString(b"2value1".to_vec()));
264        entry2.insert("2key2".to_string(), RedisValue::BulkString(b"2value2".to_vec()));
265        entry2.insert("2key3".to_string(), RedisValue::BulkString(b"2value3".to_vec()));
266
267        let mut entry3: HashMap<String, RedisValue> = HashMap::new();
268        entry3.insert("3key1".to_string(), RedisValue::BulkString(b"3value1".to_vec()));
269
270        let origin = vec![
271            StreamEntry::new("stream1".to_string(), EntryId((1581870410019, 0)), entry1),
272            StreamEntry::new("stream1".to_string(), EntryId((1581870414714, 0)), entry2),
273            StreamEntry::new("stream2".to_string(), EntryId((1581855076637, 0)), entry3)
274        ];
275
276        assert_eq!(origin, result);
277    }
278
279    #[test]
280    fn test_invalid_entry_id() {
281        let entry = RedisValue::Array(vec![
282            // x insted of -
283            RedisValue::BulkString(b"1581855076637x0".to_vec()),
284            RedisValue::Array(vec![
285                RedisValue::BulkString(b"key".to_vec()),
286                RedisValue::Int(2)
287            ])
288        ]);
289
290        let stream = RedisValue::Array(vec![
291            RedisValue::BulkString(b"stream".to_vec()),
292            RedisValue::Array(vec![entry])
293        ]);
294
295        let value = RedisValue::Array(vec![stream]);
296
297        assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
298    }
299
300    #[test]
301    fn test_invalid_key_value() {
302        let entry = RedisValue::Array(vec![
303            RedisValue::BulkString(b"1581855076637-0".to_vec()),
304            RedisValue::Array(vec![
305                // there is only key without value
306                RedisValue::BulkString(b"key".to_vec())
307            ])
308        ]);
309
310        let stream = RedisValue::Array(vec![
311            RedisValue::BulkString(b"stream".to_vec()),
312            RedisValue::Array(vec![entry])
313        ]);
314
315        let value = RedisValue::Array(vec![stream]);
316
317        assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
318    }
319
320    #[test]
321    fn test_invalid_entry_structure() {
322        let entry = RedisValue::Array(vec![
323            // there is no keys and values
324            RedisValue::BulkString(b"1581855076637-0".to_vec())
325        ]);
326
327        let stream = RedisValue::Array(vec![
328            RedisValue::BulkString(b"stream".to_vec()),
329            RedisValue::Array(vec![entry])
330        ]);
331
332        let value = RedisValue::Array(vec![stream]);
333
334        assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
335    }
336}