1use redis::Value;
2
3pub fn parse_stream_response(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
7 let streams = match as_array(value) {
8 Some(s) => s,
9 None => return Vec::new(),
10 };
11
12 let mut messages = Vec::new();
13 for stream in streams {
14 let stream_data = match as_array(stream) {
15 Some(s) if s.len() >= 2 => s,
16 _ => continue,
17 };
18
19 let entries = match as_array(&stream_data[1]) {
20 Some(e) => e,
21 None => continue,
22 };
23
24 for entry in entries {
25 if let Some((id, fields)) = parse_message_entry(entry) {
26 messages.push((id, fields));
27 }
28 }
29 }
30 messages
31}
32
33pub fn parse_claimed_messages(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
37 let entries = match as_array(value) {
38 Some(e) => e,
39 None => return Vec::new(),
40 };
41
42 let mut messages = Vec::new();
43 for entry in entries {
44 if let Some((id, fields)) = parse_message_entry(entry) {
45 messages.push((id, fields));
46 }
47 }
48 messages
49}
50
51fn parse_message_entry(entry: &Value) -> Option<(String, Vec<(String, String)>)> {
53 let parts = as_array(entry)?;
54 if parts.len() < 2 {
55 return None;
56 }
57
58 let msg_id = bulk_str(&parts[0])?;
59 let field_values = as_array(&parts[1])?;
60
61 let mut fields = Vec::new();
62 let mut i = 0;
63 while i < field_values.len().saturating_sub(1) {
64 if let (Some(key), Some(val)) = (bulk_str(&field_values[i]), bulk_str(&field_values[i.saturating_add(1)])) {
65 fields.push((key, val));
66 }
67 i = i.saturating_add(2);
68 }
69
70 Some((msg_id, fields))
71}
72
73pub fn parse_pending_entries(value: &Value) -> Vec<(String, String, u64, i64)> {
77 let entries = match as_array(value) {
78 Some(e) => e,
79 None => return Vec::new(),
80 };
81
82 let mut result = Vec::new();
83 for entry in entries {
84 let parts = match as_array(entry) {
85 Some(p) if p.len() >= 4 => p,
86 _ => continue,
87 };
88
89 if let (Some(id), Some(consumer), Some(idle), Some(count)) = (
90 bulk_str(&parts[0]),
91 bulk_str(&parts[1]),
92 as_integer(&parts[2]),
93 as_integer(&parts[3]),
94 ) {
95 result.push((id, consumer, u64::try_from(idle).unwrap_or(0), count));
96 }
97 }
98 result
99}
100
101pub fn extract_data_field(fields: &[(String, String)]) -> Option<&str> {
103 fields
104 .iter()
105 .find(|(k, _)| k == "data")
106 .map(|(_, v)| v.as_str())
107}
108
109pub fn bulk_str(value: &Value) -> Option<String> {
111 match value {
112 Value::BulkString(bytes) => String::from_utf8(bytes.clone()).ok(),
113 Value::SimpleString(s) => Some(s.clone()),
114 _ => None,
115 }
116}
117
118pub fn as_array(value: &Value) -> Option<&Vec<Value>> {
120 match value {
121 Value::Array(arr) => Some(arr),
122 _ => None,
123 }
124}
125
126pub fn as_integer(value: &Value) -> Option<i64> {
128 match value {
129 Value::Int(i) => Some(*i),
130 Value::BulkString(bytes) => String::from_utf8(bytes.clone())
131 .ok()
132 .and_then(|s| s.parse().ok()),
133 _ => None,
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use redis::Value;
141
142 #[test]
143 fn test_parse_stream_response_empty() {
144 let value = Value::Array(vec![]);
145 assert!(parse_stream_response(&value).is_empty());
146 }
147
148 #[test]
149 fn test_parse_stream_response_nil() {
150 let value = Value::Nil;
151 assert!(parse_stream_response(&value).is_empty());
152 }
153
154 #[test]
155 fn test_parse_stream_response_single_message() {
156 let value = Value::Array(vec![Value::Array(vec![
158 Value::BulkString(b"mystream".to_vec()),
159 Value::Array(vec![Value::Array(vec![
160 Value::BulkString(b"1234-0".to_vec()),
161 Value::Array(vec![
162 Value::BulkString(b"data".to_vec()),
163 Value::BulkString(b"{\"key\":\"val\"}".to_vec()),
164 ]),
165 ])]),
166 ])]);
167
168 let messages = parse_stream_response(&value);
169 assert_eq!(messages.len(), 1);
170 assert_eq!(messages[0].0, "1234-0");
171 assert_eq!(messages[0].1.len(), 1);
172 assert_eq!(messages[0].1[0].0, "data");
173 assert_eq!(messages[0].1[0].1, "{\"key\":\"val\"}");
174 }
175
176 #[test]
177 fn test_parse_claimed_messages() {
178 let value = Value::Array(vec![Value::Array(vec![
179 Value::BulkString(b"5678-0".to_vec()),
180 Value::Array(vec![
181 Value::BulkString(b"data".to_vec()),
182 Value::BulkString(b"test".to_vec()),
183 ]),
184 ])]);
185
186 let messages = parse_claimed_messages(&value);
187 assert_eq!(messages.len(), 1);
188 assert_eq!(messages[0].0, "5678-0");
189 }
190
191 #[test]
192 fn test_parse_pending_entries() {
193 let value = Value::Array(vec![Value::Array(vec![
194 Value::BulkString(b"1234-0".to_vec()),
195 Value::BulkString(b"worker-1".to_vec()),
196 Value::Int(65000),
197 Value::Int(3),
198 ])]);
199
200 let entries = parse_pending_entries(&value);
201 assert_eq!(entries.len(), 1);
202 assert_eq!(entries[0].0, "1234-0");
203 assert_eq!(entries[0].1, "worker-1");
204 assert_eq!(entries[0].2, 65000);
205 assert_eq!(entries[0].3, 3);
206 }
207
208 #[test]
209 fn test_extract_data_field() {
210 let fields = vec![
211 ("key1".to_string(), "val1".to_string()),
212 ("data".to_string(), "payload".to_string()),
213 ];
214 assert_eq!(extract_data_field(&fields), Some("payload"));
215
216 let empty: Vec<(String, String)> = vec![];
217 assert_eq!(extract_data_field(&empty), None);
218 }
219
220 #[test]
221 fn test_bulk_str_variants() {
222 assert_eq!(
223 bulk_str(&Value::BulkString(b"hello".to_vec())),
224 Some("hello".to_string())
225 );
226 assert_eq!(
227 bulk_str(&Value::SimpleString("world".to_string())),
228 Some("world".to_string())
229 );
230 assert_eq!(bulk_str(&Value::Nil), None);
231 }
232
233 #[test]
234 fn test_as_integer_variants() {
235 assert_eq!(as_integer(&Value::Int(42)), Some(42));
236 assert_eq!(
237 as_integer(&Value::BulkString(b"99".to_vec())),
238 Some(99)
239 );
240 assert_eq!(as_integer(&Value::Nil), None);
241 }
242}