Skip to main content

pyra_streams/
parse.rs

1use redis::Value;
2
3/// Parse the response from XREADGROUP into a list of (msg_id, fields) tuples.
4///
5/// XREADGROUP returns: `[[stream_key, [[msg_id, [field, value, ...]], ...]]]`
6pub fn parse_stream_response(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
7    let streams = match as_array(value) {
8        Some(s) => s,
9        None => return Vec::new(),
10    };
11
12    let mut messages = Vec::new();
13    for stream in streams {
14        let stream_data = match as_array(stream) {
15            Some(s) if s.len() >= 2 => s,
16            _ => continue,
17        };
18
19        let entries = match as_array(&stream_data[1]) {
20            Some(e) => e,
21            None => continue,
22        };
23
24        for entry in entries {
25            if let Some((id, fields)) = parse_message_entry(entry) {
26                messages.push((id, fields));
27            }
28        }
29    }
30    messages
31}
32
33/// Parse messages returned by XCLAIM into a list of (msg_id, fields) tuples.
34///
35/// XCLAIM returns: `[[msg_id, [field, value, ...]], ...]`
36pub fn parse_claimed_messages(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
37    let entries = match as_array(value) {
38        Some(e) => e,
39        None => return Vec::new(),
40    };
41
42    let mut messages = Vec::new();
43    for entry in entries {
44        if let Some((id, fields)) = parse_message_entry(entry) {
45            messages.push((id, fields));
46        }
47    }
48    messages
49}
50
51/// Parse a single message entry: `[msg_id, [field, value, ...]]`
52fn parse_message_entry(entry: &Value) -> Option<(String, Vec<(String, String)>)> {
53    let parts = as_array(entry)?;
54    if parts.len() < 2 {
55        return None;
56    }
57
58    let msg_id = bulk_str(&parts[0])?;
59    let field_values = as_array(&parts[1])?;
60
61    let mut fields = Vec::new();
62    let mut i = 0;
63    while i < field_values.len().saturating_sub(1) {
64        if let (Some(key), Some(val)) = (bulk_str(&field_values[i]), bulk_str(&field_values[i.saturating_add(1)])) {
65            fields.push((key, val));
66        }
67        i = i.saturating_add(2);
68    }
69
70    Some((msg_id, fields))
71}
72
73/// Parse XPENDING response into (msg_id, consumer, idle_ms, delivery_count) tuples.
74///
75/// Each entry: `[msg_id, consumer_name, idle_ms, delivery_count]`
76pub fn parse_pending_entries(value: &Value) -> Vec<(String, String, u64, i64)> {
77    let entries = match as_array(value) {
78        Some(e) => e,
79        None => return Vec::new(),
80    };
81
82    let mut result = Vec::new();
83    for entry in entries {
84        let parts = match as_array(entry) {
85            Some(p) if p.len() >= 4 => p,
86            _ => continue,
87        };
88
89        if let (Some(id), Some(consumer), Some(idle), Some(count)) = (
90            bulk_str(&parts[0]),
91            bulk_str(&parts[1]),
92            as_integer(&parts[2]),
93            as_integer(&parts[3]),
94        ) {
95            result.push((id, consumer, u64::try_from(idle).unwrap_or(0), count));
96        }
97    }
98    result
99}
100
101/// Extract the value of the first field named "data" from a list of (key, value) pairs.
102pub fn extract_data_field(fields: &[(String, String)]) -> Option<&str> {
103    fields
104        .iter()
105        .find(|(k, _)| k == "data")
106        .map(|(_, v)| v.as_str())
107}
108
109/// Extract a bulk string value from a Redis Value.
110pub fn bulk_str(value: &Value) -> Option<String> {
111    match value {
112        Value::BulkString(bytes) => String::from_utf8(bytes.clone()).ok(),
113        Value::SimpleString(s) => Some(s.clone()),
114        _ => None,
115    }
116}
117
118/// Extract an array from a Redis Value.
119pub fn as_array(value: &Value) -> Option<&Vec<Value>> {
120    match value {
121        Value::Array(arr) => Some(arr),
122        _ => None,
123    }
124}
125
126/// Extract an integer from a Redis Value (handles both Int and BulkString representations).
127pub fn as_integer(value: &Value) -> Option<i64> {
128    match value {
129        Value::Int(i) => Some(*i),
130        Value::BulkString(bytes) => String::from_utf8(bytes.clone())
131            .ok()
132            .and_then(|s| s.parse().ok()),
133        _ => None,
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use redis::Value;
141
142    #[test]
143    fn test_parse_stream_response_empty() {
144        let value = Value::Array(vec![]);
145        assert!(parse_stream_response(&value).is_empty());
146    }
147
148    #[test]
149    fn test_parse_stream_response_nil() {
150        let value = Value::Nil;
151        assert!(parse_stream_response(&value).is_empty());
152    }
153
154    #[test]
155    fn test_parse_stream_response_single_message() {
156        // [[stream_key, [[msg_id, [field, value]]]]]
157        let value = Value::Array(vec![Value::Array(vec![
158            Value::BulkString(b"mystream".to_vec()),
159            Value::Array(vec![Value::Array(vec![
160                Value::BulkString(b"1234-0".to_vec()),
161                Value::Array(vec![
162                    Value::BulkString(b"data".to_vec()),
163                    Value::BulkString(b"{\"key\":\"val\"}".to_vec()),
164                ]),
165            ])]),
166        ])]);
167
168        let messages = parse_stream_response(&value);
169        assert_eq!(messages.len(), 1);
170        assert_eq!(messages[0].0, "1234-0");
171        assert_eq!(messages[0].1.len(), 1);
172        assert_eq!(messages[0].1[0].0, "data");
173        assert_eq!(messages[0].1[0].1, "{\"key\":\"val\"}");
174    }
175
176    #[test]
177    fn test_parse_claimed_messages() {
178        let value = Value::Array(vec![Value::Array(vec![
179            Value::BulkString(b"5678-0".to_vec()),
180            Value::Array(vec![
181                Value::BulkString(b"data".to_vec()),
182                Value::BulkString(b"test".to_vec()),
183            ]),
184        ])]);
185
186        let messages = parse_claimed_messages(&value);
187        assert_eq!(messages.len(), 1);
188        assert_eq!(messages[0].0, "5678-0");
189    }
190
191    #[test]
192    fn test_parse_pending_entries() {
193        let value = Value::Array(vec![Value::Array(vec![
194            Value::BulkString(b"1234-0".to_vec()),
195            Value::BulkString(b"worker-1".to_vec()),
196            Value::Int(65000),
197            Value::Int(3),
198        ])]);
199
200        let entries = parse_pending_entries(&value);
201        assert_eq!(entries.len(), 1);
202        assert_eq!(entries[0].0, "1234-0");
203        assert_eq!(entries[0].1, "worker-1");
204        assert_eq!(entries[0].2, 65000);
205        assert_eq!(entries[0].3, 3);
206    }
207
208    #[test]
209    fn test_extract_data_field() {
210        let fields = vec![
211            ("key1".to_string(), "val1".to_string()),
212            ("data".to_string(), "payload".to_string()),
213        ];
214        assert_eq!(extract_data_field(&fields), Some("payload"));
215
216        let empty: Vec<(String, String)> = vec![];
217        assert_eq!(extract_data_field(&empty), None);
218    }
219
220    #[test]
221    fn test_bulk_str_variants() {
222        assert_eq!(
223            bulk_str(&Value::BulkString(b"hello".to_vec())),
224            Some("hello".to_string())
225        );
226        assert_eq!(
227            bulk_str(&Value::SimpleString("world".to_string())),
228            Some("world".to_string())
229        );
230        assert_eq!(bulk_str(&Value::Nil), None);
231    }
232
233    #[test]
234    fn test_as_integer_variants() {
235        assert_eq!(as_integer(&Value::Int(42)), Some(42));
236        assert_eq!(
237            as_integer(&Value::BulkString(b"99".to_vec())),
238            Some(99)
239        );
240        assert_eq!(as_integer(&Value::Nil), None);
241    }
242}