pyra-streams 0.4.4

Redis Stream consumer infrastructure for Pyra services
Documentation
use redis::Value;

/// Parse the response from XREADGROUP into a list of (msg_id, fields) tuples.
///
/// XREADGROUP returns: `[[stream_key, [[msg_id, [field, value, ...]], ...]]]`
pub fn parse_stream_response(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
    let streams = match as_array(value) {
        Some(s) => s,
        None => return Vec::new(),
    };

    let mut messages = Vec::new();
    for stream in streams {
        let stream_data = match as_array(stream) {
            Some(s) if s.len() >= 2 => s,
            _ => continue,
        };

        let entries = match as_array(&stream_data[1]) {
            Some(e) => e,
            None => continue,
        };

        for entry in entries {
            if let Some((id, fields)) = parse_message_entry(entry) {
                messages.push((id, fields));
            }
        }
    }
    messages
}

/// Parse messages returned by XCLAIM into a list of (msg_id, fields) tuples.
///
/// XCLAIM returns: `[[msg_id, [field, value, ...]], ...]`
pub fn parse_claimed_messages(value: &Value) -> Vec<(String, Vec<(String, String)>)> {
    let entries = match as_array(value) {
        Some(e) => e,
        None => return Vec::new(),
    };

    let mut messages = Vec::new();
    for entry in entries {
        if let Some((id, fields)) = parse_message_entry(entry) {
            messages.push((id, fields));
        }
    }
    messages
}

/// Parse a single message entry: `[msg_id, [field, value, ...]]`
fn parse_message_entry(entry: &Value) -> Option<(String, Vec<(String, String)>)> {
    let parts = as_array(entry)?;
    if parts.len() < 2 {
        return None;
    }

    let msg_id = bulk_str(&parts[0])?;
    let field_values = as_array(&parts[1])?;

    let mut fields = Vec::new();
    let mut i = 0;
    while i < field_values.len().saturating_sub(1) {
        if let (Some(key), Some(val)) = (bulk_str(&field_values[i]), bulk_str(&field_values[i.saturating_add(1)])) {
            fields.push((key, val));
        }
        i = i.saturating_add(2);
    }

    Some((msg_id, fields))
}

/// Parse XPENDING response into (msg_id, consumer, idle_ms, delivery_count) tuples.
///
/// Each entry: `[msg_id, consumer_name, idle_ms, delivery_count]`
pub fn parse_pending_entries(value: &Value) -> Vec<(String, String, u64, i64)> {
    let entries = match as_array(value) {
        Some(e) => e,
        None => return Vec::new(),
    };

    let mut result = Vec::new();
    for entry in entries {
        let parts = match as_array(entry) {
            Some(p) if p.len() >= 4 => p,
            _ => continue,
        };

        if let (Some(id), Some(consumer), Some(idle), Some(count)) = (
            bulk_str(&parts[0]),
            bulk_str(&parts[1]),
            as_integer(&parts[2]),
            as_integer(&parts[3]),
        ) {
            result.push((id, consumer, u64::try_from(idle).unwrap_or(0), count));
        }
    }
    result
}

/// Extract the value of the first field named "data" from a list of (key, value) pairs.
pub fn extract_data_field(fields: &[(String, String)]) -> Option<&str> {
    extract_field(fields, "data")
}

/// Extract the value of a named field from a list of (key, value) pairs.
pub fn extract_field<'a>(fields: &'a [(String, String)], field_name: &str) -> Option<&'a str> {
    fields
        .iter()
        .find(|(k, _)| k == field_name)
        .map(|(_, v)| v.as_str())
}

/// Extract a bulk string value from a Redis Value.
pub fn bulk_str(value: &Value) -> Option<String> {
    match value {
        Value::BulkString(bytes) => String::from_utf8(bytes.clone()).ok(),
        Value::SimpleString(s) => Some(s.clone()),
        _ => None,
    }
}

/// Extract an array from a Redis Value.
pub fn as_array(value: &Value) -> Option<&Vec<Value>> {
    match value {
        Value::Array(arr) => Some(arr),
        _ => None,
    }
}

/// Extract an integer from a Redis Value (handles both Int and BulkString representations).
pub fn as_integer(value: &Value) -> Option<i64> {
    match value {
        Value::Int(i) => Some(*i),
        Value::BulkString(bytes) => String::from_utf8(bytes.clone())
            .ok()
            .and_then(|s| s.parse().ok()),
        _ => None,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use redis::Value;

    #[test]
    fn test_parse_stream_response_empty() {
        let value = Value::Array(vec![]);
        assert!(parse_stream_response(&value).is_empty());
    }

    #[test]
    fn test_parse_stream_response_nil() {
        let value = Value::Nil;
        assert!(parse_stream_response(&value).is_empty());
    }

    #[test]
    fn test_parse_stream_response_single_message() {
        // [[stream_key, [[msg_id, [field, value]]]]]
        let value = Value::Array(vec![Value::Array(vec![
            Value::BulkString(b"mystream".to_vec()),
            Value::Array(vec![Value::Array(vec![
                Value::BulkString(b"1234-0".to_vec()),
                Value::Array(vec![
                    Value::BulkString(b"data".to_vec()),
                    Value::BulkString(b"{\"key\":\"val\"}".to_vec()),
                ]),
            ])]),
        ])]);

        let messages = parse_stream_response(&value);
        assert_eq!(messages.len(), 1);
        assert_eq!(messages[0].0, "1234-0");
        assert_eq!(messages[0].1.len(), 1);
        assert_eq!(messages[0].1[0].0, "data");
        assert_eq!(messages[0].1[0].1, "{\"key\":\"val\"}");
    }

    #[test]
    fn test_parse_claimed_messages() {
        let value = Value::Array(vec![Value::Array(vec![
            Value::BulkString(b"5678-0".to_vec()),
            Value::Array(vec![
                Value::BulkString(b"data".to_vec()),
                Value::BulkString(b"test".to_vec()),
            ]),
        ])]);

        let messages = parse_claimed_messages(&value);
        assert_eq!(messages.len(), 1);
        assert_eq!(messages[0].0, "5678-0");
    }

    #[test]
    fn test_parse_pending_entries() {
        let value = Value::Array(vec![Value::Array(vec![
            Value::BulkString(b"1234-0".to_vec()),
            Value::BulkString(b"worker-1".to_vec()),
            Value::Int(65000),
            Value::Int(3),
        ])]);

        let entries = parse_pending_entries(&value);
        assert_eq!(entries.len(), 1);
        assert_eq!(entries[0].0, "1234-0");
        assert_eq!(entries[0].1, "worker-1");
        assert_eq!(entries[0].2, 65000);
        assert_eq!(entries[0].3, 3);
    }

    #[test]
    fn test_extract_data_field() {
        let fields = vec![
            ("key1".to_string(), "val1".to_string()),
            ("data".to_string(), "payload".to_string()),
        ];
        assert_eq!(extract_data_field(&fields), Some("payload"));

        let empty: Vec<(String, String)> = vec![];
        assert_eq!(extract_data_field(&empty), None);
    }

    #[test]
    fn test_bulk_str_variants() {
        assert_eq!(
            bulk_str(&Value::BulkString(b"hello".to_vec())),
            Some("hello".to_string())
        );
        assert_eq!(
            bulk_str(&Value::SimpleString("world".to_string())),
            Some("world".to_string())
        );
        assert_eq!(bulk_str(&Value::Nil), None);
    }

    #[test]
    fn test_as_integer_variants() {
        assert_eq!(as_integer(&Value::Int(42)), Some(42));
        assert_eq!(
            as_integer(&Value::BulkString(b"99".to_vec())),
            Some(99)
        );
        assert_eq!(as_integer(&Value::Nil), None);
    }
}