Skip to main content

pyra_streams/
parse.rs

1use redis::Value;
2
3/// Parse the response from XREADGROUP into a list of (msg_id, fields) tuples.
4///
5/// XREADGROUP returns: `[[stream_key, [[msg_id, [field, value, ...]], ...]]]`
6pub fn parse_stream_response(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
7    let streams = match as_array(value) {
8        Some(s) => s,
9        None => return Vec::new(),
10    };
11
12    let mut messages = Vec::new();
13    for stream in streams {
14        let stream_data = match as_array(stream) {
15            Some(s) if s.len() >= 2 => s,
16            _ => continue,
17        };
18
19        let entries = match as_array(&stream_data[1]) {
20            Some(e) => e,
21            None => continue,
22        };
23
24        for entry in entries {
25            if let Some((id, fields)) = parse_message_entry(entry) {
26                messages.push((id, fields));
27            }
28        }
29    }
30    messages
31}
32
33/// Parse messages returned by XCLAIM into a list of (msg_id, fields) tuples.
34///
35/// XCLAIM returns: `[[msg_id, [field, value, ...]], ...]`
36pub fn parse_claimed_messages(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
37    let entries = match as_array(value) {
38        Some(e) => e,
39        None => return Vec::new(),
40    };
41
42    let mut messages = Vec::new();
43    for entry in entries {
44        if let Some((id, fields)) = parse_message_entry(entry) {
45            messages.push((id, fields));
46        }
47    }
48    messages
49}
50
51/// Parse a single message entry: `[msg_id, [field, value, ...]]`
52fn parse_message_entry(entry: &Value) -> Option<(String, Vec<(String, String)>)> {
53    let parts = as_array(entry)?;
54    if parts.len() < 2 {
55        return None;
56    }
57
58    let msg_id = bulk_str(&parts[0])?;
59    let field_values = as_array(&parts[1])?;
60
61    let mut fields = Vec::new();
62    let mut i = 0;
63    while i < field_values.len().saturating_sub(1) {
64        if let (Some(key), Some(val)) = (bulk_str(&field_values[i]), bulk_str(&field_values[i.saturating_add(1)])) {
65            fields.push((key, val));
66        }
67        i = i.saturating_add(2);
68    }
69
70    Some((msg_id, fields))
71}
72
73/// Parse XPENDING response into (msg_id, consumer, idle_ms, delivery_count) tuples.
74///
75/// Each entry: `[msg_id, consumer_name, idle_ms, delivery_count]`
76pub fn parse_pending_entries(value: &Value) -> Vec<(String, String, u64, i64)> {
77    let entries = match as_array(value) {
78        Some(e) => e,
79        None => return Vec::new(),
80    };
81
82    let mut result = Vec::new();
83    for entry in entries {
84        let parts = match as_array(entry) {
85            Some(p) if p.len() >= 4 => p,
86            _ => continue,
87        };
88
89        if let (Some(id), Some(consumer), Some(idle), Some(count)) = (
90            bulk_str(&parts[0]),
91            bulk_str(&parts[1]),
92            as_integer(&parts[2]),
93            as_integer(&parts[3]),
94        ) {
95            result.push((id, consumer, u64::try_from(idle).unwrap_or(0), count));
96        }
97    }
98    result
99}
100
101/// Extract the value of the first field named "data" from a list of (key, value) pairs.
102pub fn extract_data_field(fields: &[(String, String)]) -> Option<&str> {
103    extract_field(fields, "data")
104}
105
106/// Extract the value of a named field from a list of (key, value) pairs.
107pub fn extract_field<'a>(fields: &'a [(String, String)], field_name: &str) -> Option<&'a str> {
108    fields
109        .iter()
110        .find(|(k, _)| k == field_name)
111        .map(|(_, v)| v.as_str())
112}
113
114/// Extract a bulk string value from a Redis Value.
115pub fn bulk_str(value: &Value) -> Option<String> {
116    match value {
117        Value::BulkString(bytes) => String::from_utf8(bytes.clone()).ok(),
118        Value::SimpleString(s) => Some(s.clone()),
119        _ => None,
120    }
121}
122
123/// Extract an array from a Redis Value.
124pub fn as_array(value: &Value) -> Option<&Vec<Value>> {
125    match value {
126        Value::Array(arr) => Some(arr),
127        _ => None,
128    }
129}
130
131/// Extract an integer from a Redis Value (handles both Int and BulkString representations).
132pub fn as_integer(value: &Value) -> Option<i64> {
133    match value {
134        Value::Int(i) => Some(*i),
135        Value::BulkString(bytes) => String::from_utf8(bytes.clone())
136            .ok()
137            .and_then(|s| s.parse().ok()),
138        _ => None,
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use redis::Value;
146
147    #[test]
148    fn test_parse_stream_response_empty() {
149        let value = Value::Array(vec![]);
150        assert!(parse_stream_response(&value).is_empty());
151    }
152
153    #[test]
154    fn test_parse_stream_response_nil() {
155        let value = Value::Nil;
156        assert!(parse_stream_response(&value).is_empty());
157    }
158
159    #[test]
160    fn test_parse_stream_response_single_message() {
161        // [[stream_key, [[msg_id, [field, value]]]]]
162        let value = Value::Array(vec![Value::Array(vec![
163            Value::BulkString(b"mystream".to_vec()),
164            Value::Array(vec![Value::Array(vec![
165                Value::BulkString(b"1234-0".to_vec()),
166                Value::Array(vec![
167                    Value::BulkString(b"data".to_vec()),
168                    Value::BulkString(b"{\"key\":\"val\"}".to_vec()),
169                ]),
170            ])]),
171        ])]);
172
173        let messages = parse_stream_response(&value);
174        assert_eq!(messages.len(), 1);
175        assert_eq!(messages[0].0, "1234-0");
176        assert_eq!(messages[0].1.len(), 1);
177        assert_eq!(messages[0].1[0].0, "data");
178        assert_eq!(messages[0].1[0].1, "{\"key\":\"val\"}");
179    }
180
181    #[test]
182    fn test_parse_claimed_messages() {
183        let value = Value::Array(vec![Value::Array(vec![
184            Value::BulkString(b"5678-0".to_vec()),
185            Value::Array(vec![
186                Value::BulkString(b"data".to_vec()),
187                Value::BulkString(b"test".to_vec()),
188            ]),
189        ])]);
190
191        let messages = parse_claimed_messages(&value);
192        assert_eq!(messages.len(), 1);
193        assert_eq!(messages[0].0, "5678-0");
194    }
195
196    #[test]
197    fn test_parse_pending_entries() {
198        let value = Value::Array(vec![Value::Array(vec![
199            Value::BulkString(b"1234-0".to_vec()),
200            Value::BulkString(b"worker-1".to_vec()),
201            Value::Int(65000),
202            Value::Int(3),
203        ])]);
204
205        let entries = parse_pending_entries(&value);
206        assert_eq!(entries.len(), 1);
207        assert_eq!(entries[0].0, "1234-0");
208        assert_eq!(entries[0].1, "worker-1");
209        assert_eq!(entries[0].2, 65000);
210        assert_eq!(entries[0].3, 3);
211    }
212
213    #[test]
214    fn test_extract_data_field() {
215        let fields = vec![
216            ("key1".to_string(), "val1".to_string()),
217            ("data".to_string(), "payload".to_string()),
218        ];
219        assert_eq!(extract_data_field(&fields), Some("payload"));
220
221        let empty: Vec<(String, String)> = vec![];
222        assert_eq!(extract_data_field(&empty), None);
223    }
224
225    #[test]
226    fn test_bulk_str_variants() {
227        assert_eq!(
228            bulk_str(&Value::BulkString(b"hello".to_vec())),
229            Some("hello".to_string())
230        );
231        assert_eq!(
232            bulk_str(&Value::SimpleString("world".to_string())),
233            Some("world".to_string())
234        );
235        assert_eq!(bulk_str(&Value::Nil), None);
236    }
237
238    #[test]
239    fn test_as_integer_variants() {
240        assert_eq!(as_integer(&Value::Int(42)), Some(42));
241        assert_eq!(
242            as_integer(&Value::BulkString(b"99".to_vec())),
243            Some(99)
244        );
245        assert_eq!(as_integer(&Value::Nil), None);
246    }
247}