1use redis::Value;
2
3pub fn parse_stream_response(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
7 let streams = match as_array(value) {
8 Some(s) => s,
9 None => return Vec::new(),
10 };
11
12 let mut messages = Vec::new();
13 for stream in streams {
14 let stream_data = match as_array(stream) {
15 Some(s) if s.len() >= 2 => s,
16 _ => continue,
17 };
18
19 let entries = match as_array(&stream_data[1]) {
20 Some(e) => e,
21 None => continue,
22 };
23
24 for entry in entries {
25 if let Some((id, fields)) = parse_message_entry(entry) {
26 messages.push((id, fields));
27 }
28 }
29 }
30 messages
31}
32
33pub fn parse_claimed_messages(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
37 let entries = match as_array(value) {
38 Some(e) => e,
39 None => return Vec::new(),
40 };
41
42 let mut messages = Vec::new();
43 for entry in entries {
44 if let Some((id, fields)) = parse_message_entry(entry) {
45 messages.push((id, fields));
46 }
47 }
48 messages
49}
50
51fn parse_message_entry(entry: &Value) -> Option<(String, Vec<(String, String)>)> {
53 let parts = as_array(entry)?;
54 if parts.len() < 2 {
55 return None;
56 }
57
58 let msg_id = bulk_str(&parts[0])?;
59 let field_values = as_array(&parts[1])?;
60
61 let mut fields = Vec::new();
62 let mut i = 0;
63 while i < field_values.len().saturating_sub(1) {
64 if let (Some(key), Some(val)) = (bulk_str(&field_values[i]), bulk_str(&field_values[i.saturating_add(1)])) {
65 fields.push((key, val));
66 }
67 i = i.saturating_add(2);
68 }
69
70 Some((msg_id, fields))
71}
72
73pub fn parse_pending_entries(value: &Value) -> Vec<(String, String, u64, i64)> {
77 let entries = match as_array(value) {
78 Some(e) => e,
79 None => return Vec::new(),
80 };
81
82 let mut result = Vec::new();
83 for entry in entries {
84 let parts = match as_array(entry) {
85 Some(p) if p.len() >= 4 => p,
86 _ => continue,
87 };
88
89 if let (Some(id), Some(consumer), Some(idle), Some(count)) = (
90 bulk_str(&parts[0]),
91 bulk_str(&parts[1]),
92 as_integer(&parts[2]),
93 as_integer(&parts[3]),
94 ) {
95 result.push((id, consumer, u64::try_from(idle).unwrap_or(0), count));
96 }
97 }
98 result
99}
100
101pub fn extract_data_field(fields: &[(String, String)]) -> Option<&str> {
103 extract_field(fields, "data")
104}
105
106pub fn extract_field<'a>(fields: &'a [(String, String)], field_name: &str) -> Option<&'a str> {
108 fields
109 .iter()
110 .find(|(k, _)| k == field_name)
111 .map(|(_, v)| v.as_str())
112}
113
114pub fn bulk_str(value: &Value) -> Option<String> {
116 match value {
117 Value::BulkString(bytes) => String::from_utf8(bytes.clone()).ok(),
118 Value::SimpleString(s) => Some(s.clone()),
119 _ => None,
120 }
121}
122
123pub fn as_array(value: &Value) -> Option<&Vec<Value>> {
125 match value {
126 Value::Array(arr) => Some(arr),
127 _ => None,
128 }
129}
130
131pub fn as_integer(value: &Value) -> Option<i64> {
133 match value {
134 Value::Int(i) => Some(*i),
135 Value::BulkString(bytes) => String::from_utf8(bytes.clone())
136 .ok()
137 .and_then(|s| s.parse().ok()),
138 _ => None,
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use redis::Value;
146
147 #[test]
148 fn test_parse_stream_response_empty() {
149 let value = Value::Array(vec![]);
150 assert!(parse_stream_response(&value).is_empty());
151 }
152
153 #[test]
154 fn test_parse_stream_response_nil() {
155 let value = Value::Nil;
156 assert!(parse_stream_response(&value).is_empty());
157 }
158
159 #[test]
160 fn test_parse_stream_response_single_message() {
161 let value = Value::Array(vec![Value::Array(vec![
163 Value::BulkString(b"mystream".to_vec()),
164 Value::Array(vec![Value::Array(vec![
165 Value::BulkString(b"1234-0".to_vec()),
166 Value::Array(vec![
167 Value::BulkString(b"data".to_vec()),
168 Value::BulkString(b"{\"key\":\"val\"}".to_vec()),
169 ]),
170 ])]),
171 ])]);
172
173 let messages = parse_stream_response(&value);
174 assert_eq!(messages.len(), 1);
175 assert_eq!(messages[0].0, "1234-0");
176 assert_eq!(messages[0].1.len(), 1);
177 assert_eq!(messages[0].1[0].0, "data");
178 assert_eq!(messages[0].1[0].1, "{\"key\":\"val\"}");
179 }
180
181 #[test]
182 fn test_parse_claimed_messages() {
183 let value = Value::Array(vec![Value::Array(vec![
184 Value::BulkString(b"5678-0".to_vec()),
185 Value::Array(vec![
186 Value::BulkString(b"data".to_vec()),
187 Value::BulkString(b"test".to_vec()),
188 ]),
189 ])]);
190
191 let messages = parse_claimed_messages(&value);
192 assert_eq!(messages.len(), 1);
193 assert_eq!(messages[0].0, "5678-0");
194 }
195
196 #[test]
197 fn test_parse_pending_entries() {
198 let value = Value::Array(vec![Value::Array(vec![
199 Value::BulkString(b"1234-0".to_vec()),
200 Value::BulkString(b"worker-1".to_vec()),
201 Value::Int(65000),
202 Value::Int(3),
203 ])]);
204
205 let entries = parse_pending_entries(&value);
206 assert_eq!(entries.len(), 1);
207 assert_eq!(entries[0].0, "1234-0");
208 assert_eq!(entries[0].1, "worker-1");
209 assert_eq!(entries[0].2, 65000);
210 assert_eq!(entries[0].3, 3);
211 }
212
213 #[test]
214 fn test_extract_data_field() {
215 let fields = vec![
216 ("key1".to_string(), "val1".to_string()),
217 ("data".to_string(), "payload".to_string()),
218 ];
219 assert_eq!(extract_data_field(&fields), Some("payload"));
220
221 let empty: Vec<(String, String)> = vec![];
222 assert_eq!(extract_data_field(&empty), None);
223 }
224
225 #[test]
226 fn test_bulk_str_variants() {
227 assert_eq!(
228 bulk_str(&Value::BulkString(b"hello".to_vec())),
229 Some("hello".to_string())
230 );
231 assert_eq!(
232 bulk_str(&Value::SimpleString("world".to_string())),
233 Some("world".to_string())
234 );
235 assert_eq!(bulk_str(&Value::Nil), None);
236 }
237
238 #[test]
239 fn test_as_integer_variants() {
240 assert_eq!(as_integer(&Value::Int(42)), Some(42));
241 assert_eq!(
242 as_integer(&Value::BulkString(b"99".to_vec())),
243 Some(99)
244 );
245 assert_eq!(as_integer(&Value::Nil), None);
246 }
247}