use redis::Value;
pub fn parse_stream_response(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
let streams = match as_array(value) {
Some(s) => s,
None => return Vec::new(),
};
let mut messages = Vec::new();
for stream in streams {
let stream_data = match as_array(stream) {
Some(s) if s.len() >= 2 => s,
_ => continue,
};
let entries = match as_array(&stream_data[1]) {
Some(e) => e,
None => continue,
};
for entry in entries {
if let Some((id, fields)) = parse_message_entry(entry) {
messages.push((id, fields));
}
}
}
messages
}
pub fn parse_claimed_messages(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
let entries = match as_array(value) {
Some(e) => e,
None => return Vec::new(),
};
let mut messages = Vec::new();
for entry in entries {
if let Some((id, fields)) = parse_message_entry(entry) {
messages.push((id, fields));
}
}
messages
}
fn parse_message_entry(entry: &Value) -> Option<(String, Vec<(String, String)>)> {
let parts = as_array(entry)?;
if parts.len() < 2 {
return None;
}
let msg_id = bulk_str(&parts[0])?;
let field_values = as_array(&parts[1])?;
let mut fields = Vec::new();
let mut i = 0;
while i < field_values.len().saturating_sub(1) {
if let (Some(key), Some(val)) = (bulk_str(&field_values[i]), bulk_str(&field_values[i.saturating_add(1)])) {
fields.push((key, val));
}
i = i.saturating_add(2);
}
Some((msg_id, fields))
}
pub fn parse_pending_entries(value: &Value) -> Vec<(String, String, u64, i64)> {
let entries = match as_array(value) {
Some(e) => e,
None => return Vec::new(),
};
let mut result = Vec::new();
for entry in entries {
let parts = match as_array(entry) {
Some(p) if p.len() >= 4 => p,
_ => continue,
};
if let (Some(id), Some(consumer), Some(idle), Some(count)) = (
bulk_str(&parts[0]),
bulk_str(&parts[1]),
as_integer(&parts[2]),
as_integer(&parts[3]),
) {
result.push((id, consumer, u64::try_from(idle).unwrap_or(0), count));
}
}
result
}
pub fn extract_data_field(fields: &[(String, String)]) -> Option<&str> {
extract_field(fields, "data")
}
pub fn extract_field<'a>(fields: &'a [(String, String)], field_name: &str) -> Option<&'a str> {
fields
.iter()
.find(|(k, _)| k == field_name)
.map(|(_, v)| v.as_str())
}
pub fn bulk_str(value: &Value) -> Option<String> {
match value {
Value::BulkString(bytes) => String::from_utf8(bytes.clone()).ok(),
Value::SimpleString(s) => Some(s.clone()),
_ => None,
}
}
pub fn as_array(value: &Value) -> Option<&Vec<Value>> {
match value {
Value::Array(arr) => Some(arr),
_ => None,
}
}
pub fn as_integer(value: &Value) -> Option<i64> {
match value {
Value::Int(i) => Some(*i),
Value::BulkString(bytes) => String::from_utf8(bytes.clone())
.ok()
.and_then(|s| s.parse().ok()),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use redis::Value;
#[test]
fn test_parse_stream_response_empty() {
let value = Value::Array(vec![]);
assert!(parse_stream_response(&value).is_empty());
}
#[test]
fn test_parse_stream_response_nil() {
let value = Value::Nil;
assert!(parse_stream_response(&value).is_empty());
}
#[test]
fn test_parse_stream_response_single_message() {
let value = Value::Array(vec![Value::Array(vec![
Value::BulkString(b"mystream".to_vec()),
Value::Array(vec![Value::Array(vec![
Value::BulkString(b"1234-0".to_vec()),
Value::Array(vec![
Value::BulkString(b"data".to_vec()),
Value::BulkString(b"{\"key\":\"val\"}".to_vec()),
]),
])]),
])]);
let messages = parse_stream_response(&value);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, "1234-0");
assert_eq!(messages[0].1.len(), 1);
assert_eq!(messages[0].1[0].0, "data");
assert_eq!(messages[0].1[0].1, "{\"key\":\"val\"}");
}
#[test]
fn test_parse_claimed_messages() {
let value = Value::Array(vec![Value::Array(vec![
Value::BulkString(b"5678-0".to_vec()),
Value::Array(vec![
Value::BulkString(b"data".to_vec()),
Value::BulkString(b"test".to_vec()),
]),
])]);
let messages = parse_claimed_messages(&value);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, "5678-0");
}
#[test]
fn test_parse_pending_entries() {
let value = Value::Array(vec![Value::Array(vec![
Value::BulkString(b"1234-0".to_vec()),
Value::BulkString(b"worker-1".to_vec()),
Value::Int(65000),
Value::Int(3),
])]);
let entries = parse_pending_entries(&value);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].0, "1234-0");
assert_eq!(entries[0].1, "worker-1");
assert_eq!(entries[0].2, 65000);
assert_eq!(entries[0].3, 3);
}
#[test]
fn test_extract_data_field() {
let fields = vec![
("key1".to_string(), "val1".to_string()),
("data".to_string(), "payload".to_string()),
];
assert_eq!(extract_data_field(&fields), Some("payload"));
let empty: Vec<(String, String)> = vec![];
assert_eq!(extract_data_field(&empty), None);
}
#[test]
fn test_bulk_str_variants() {
assert_eq!(
bulk_str(&Value::BulkString(b"hello".to_vec())),
Some("hello".to_string())
);
assert_eq!(
bulk_str(&Value::SimpleString("world".to_string())),
Some("world".to_string())
);
assert_eq!(bulk_str(&Value::Nil), None);
}
#[test]
fn test_as_integer_variants() {
assert_eq!(as_integer(&Value::Int(42)), Some(42));
assert_eq!(
as_integer(&Value::BulkString(b"99".to_vec())),
Some(99)
);
assert_eq!(as_integer(&Value::Nil), None);
}
}