use crate::core::{
error::{RedisError, RedisResult},
value::RespValue,
};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct StreamEntry {
pub id: String,
pub fields: HashMap<String, String>,
}
impl StreamEntry {
pub fn new(id: String, fields: HashMap<String, String>) -> Self {
Self { id, fields }
}
#[must_use]
pub fn get_field(&self, field: &str) -> Option<&String> {
self.fields.get(field)
}
#[must_use]
pub fn has_field(&self, field: &str) -> bool {
self.fields.contains_key(field)
}
#[must_use]
pub fn timestamp(&self) -> Option<u64> {
self.id.split('-').next()?.parse().ok()
}
#[must_use]
pub fn sequence(&self) -> Option<u64> {
self.id.split('-').nth(1)?.parse().ok()
}
}
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub length: u64,
pub groups: u64,
pub first_entry: Option<String>,
pub last_entry: Option<String>,
pub last_generated_id: String,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupInfo {
pub name: String,
pub consumers: u64,
pub pending: u64,
pub last_delivered_id: String,
}
#[derive(Debug, Clone)]
pub struct ConsumerInfo {
pub name: String,
pub pending: u64,
pub idle: u64,
}
#[derive(Debug, Clone)]
pub struct PendingMessage {
pub id: String,
pub consumer: String,
pub idle_time: u64,
pub delivery_count: u64,
}
#[derive(Debug, Clone)]
pub struct StreamRange {
pub start: String,
pub end: String,
pub count: Option<u64>,
}
impl StreamRange {
pub fn new(start: impl Into<String>, end: impl Into<String>) -> Self {
Self {
start: start.into(),
end: end.into(),
count: None,
}
}
pub fn with_count(mut self, count: u64) -> Self {
self.count = Some(count);
self
}
pub fn all() -> Self {
Self::new("-", "+")
}
pub fn from(start: impl Into<String>) -> Self {
Self::new(start, "+")
}
pub fn to(end: impl Into<String>) -> Self {
Self::new("-", end)
}
}
#[derive(Debug, Clone)]
pub struct ReadOptions {
pub count: Option<u64>,
pub block: Option<Duration>,
}
impl ReadOptions {
#[must_use]
pub fn new() -> Self {
Self {
count: None,
block: None,
}
}
pub fn with_count(mut self, count: u64) -> Self {
self.count = Some(count);
self
}
pub fn with_block(mut self, timeout: Duration) -> Self {
self.block = Some(timeout);
self
}
pub fn blocking(timeout: Duration) -> Self {
Self::new().with_block(timeout)
}
pub fn non_blocking(count: u64) -> Self {
Self::new().with_count(count)
}
}
impl Default for ReadOptions {
fn default() -> Self {
Self::new()
}
}
pub fn parse_stream_entries(response: RespValue) -> RedisResult<Vec<StreamEntry>> {
match response {
RespValue::Array(items) => {
let mut entries = Vec::new();
for item in items {
match item {
RespValue::Array(entry_data) if entry_data.len() == 2 => {
let id = entry_data[0].as_string()?;
match &entry_data[1] {
RespValue::Array(field_values) => {
let mut fields = HashMap::new();
for chunk in field_values.chunks(2) {
if chunk.len() == 2 {
let field = chunk[0].as_string()?;
let value = chunk[1].as_string()?;
fields.insert(field, value);
}
}
entries.push(StreamEntry::new(id, fields));
}
_ => {
return Err(RedisError::Type(format!(
"Invalid stream entry field format: {:?}",
entry_data[1]
)))
}
}
}
_ => {
return Err(RedisError::Type(format!(
"Invalid stream entry format: {:?}",
item
)))
}
}
}
Ok(entries)
}
_ => Err(RedisError::Type(format!(
"Expected array for stream entries, got: {:?}",
response
))),
}
}
pub fn parse_xread_response(response: RespValue) -> RedisResult<HashMap<String, Vec<StreamEntry>>> {
match response {
RespValue::Array(streams) => {
let mut result = HashMap::new();
for stream in streams {
match stream {
RespValue::Array(stream_data) if stream_data.len() == 2 => {
let stream_name = stream_data[0].as_string()?;
let entries = parse_stream_entries(stream_data[1].clone())?;
result.insert(stream_name, entries);
}
_ => {
return Err(RedisError::Type(format!(
"Invalid XREAD response format: {:?}",
stream
)))
}
}
}
Ok(result)
}
RespValue::Null => Ok(HashMap::new()), _ => Err(RedisError::Type(format!(
"Expected array or null for XREAD response, got: {:?}",
response
))),
}
}
pub fn parse_stream_info(response: RespValue) -> RedisResult<StreamInfo> {
match response {
RespValue::Array(items) => {
let mut length = 0;
let mut groups = 0;
let mut first_entry = None;
let mut last_entry = None;
let mut last_generated_id = String::new();
for chunk in items.chunks(2) {
if chunk.len() == 2 {
let key = chunk[0].as_string()?;
match key.as_str() {
"length" => length = chunk[1].as_int()? as u64,
"groups" => groups = chunk[1].as_int()? as u64,
"first-entry" => {
if !chunk[1].is_null() {
if let RespValue::Array(entry) = &chunk[1] {
if !entry.is_empty() {
first_entry = Some(entry[0].as_string()?);
}
}
}
}
"last-entry" => {
if !chunk[1].is_null() {
if let RespValue::Array(entry) = &chunk[1] {
if !entry.is_empty() {
last_entry = Some(entry[0].as_string()?);
}
}
}
}
"last-generated-id" => last_generated_id = chunk[1].as_string()?,
_ => {} }
}
}
Ok(StreamInfo {
length,
groups,
first_entry,
last_entry,
last_generated_id,
})
}
_ => Err(RedisError::Type(format!(
"Expected array for stream info, got: {:?}",
response
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_entry_creation() {
let mut fields = HashMap::new();
fields.insert("user".to_string(), "alice".to_string());
fields.insert("action".to_string(), "login".to_string());
let entry = StreamEntry::new("1234567890123-0".to_string(), fields.clone());
assert_eq!(entry.id, "1234567890123-0");
assert_eq!(entry.fields, fields);
assert_eq!(entry.get_field("user"), Some(&"alice".to_string()));
assert!(entry.has_field("action"));
assert!(!entry.has_field("nonexistent"));
}
#[test]
fn test_stream_entry_timestamp_parsing() {
let entry = StreamEntry::new("1234567890123-5".to_string(), HashMap::new());
assert_eq!(entry.timestamp(), Some(1_234_567_890_123));
assert_eq!(entry.sequence(), Some(5));
}
#[test]
fn test_stream_entry_invalid_id() {
let entry = StreamEntry::new("invalid-id".to_string(), HashMap::new());
assert_eq!(entry.timestamp(), None);
assert_eq!(entry.sequence(), None);
}
#[test]
fn test_stream_range_creation() {
let range = StreamRange::new("1000", "2000").with_count(10);
assert_eq!(range.start, "1000");
assert_eq!(range.end, "2000");
assert_eq!(range.count, Some(10));
}
#[test]
fn test_stream_range_presets() {
let all = StreamRange::all();
assert_eq!(all.start, "-");
assert_eq!(all.end, "+");
let from = StreamRange::from("1000");
assert_eq!(from.start, "1000");
assert_eq!(from.end, "+");
let to = StreamRange::to("2000");
assert_eq!(to.start, "-");
assert_eq!(to.end, "2000");
}
#[test]
fn test_read_options() {
let options = ReadOptions::new()
.with_count(5)
.with_block(Duration::from_secs(1));
assert_eq!(options.count, Some(5));
assert_eq!(options.block, Some(Duration::from_secs(1)));
let blocking = ReadOptions::blocking(Duration::from_millis(500));
assert_eq!(blocking.block, Some(Duration::from_millis(500)));
let non_blocking = ReadOptions::non_blocking(10);
assert_eq!(non_blocking.count, Some(10));
assert_eq!(non_blocking.block, None);
}
#[test]
fn test_parse_stream_entries() {
let response = RespValue::Array(vec![
RespValue::Array(vec![
RespValue::from("1234567890123-0"),
RespValue::Array(vec![
RespValue::from("user"),
RespValue::from("alice"),
RespValue::from("action"),
RespValue::from("login"),
]),
]),
RespValue::Array(vec![
RespValue::from("1234567890124-0"),
RespValue::Array(vec![
RespValue::from("user"),
RespValue::from("bob"),
RespValue::from("action"),
RespValue::from("logout"),
]),
]),
]);
let entries = parse_stream_entries(response).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].id, "1234567890123-0");
assert_eq!(entries[0].get_field("user"), Some(&"alice".to_string()));
assert_eq!(entries[0].get_field("action"), Some(&"login".to_string()));
assert_eq!(entries[1].id, "1234567890124-0");
assert_eq!(entries[1].get_field("user"), Some(&"bob".to_string()));
assert_eq!(entries[1].get_field("action"), Some(&"logout".to_string()));
}
#[test]
fn test_parse_xread_response() {
let response = RespValue::Array(vec![RespValue::Array(vec![
RespValue::from("stream1"),
RespValue::Array(vec![RespValue::Array(vec![
RespValue::from("1234567890123-0"),
RespValue::Array(vec![RespValue::from("field1"), RespValue::from("value1")]),
])]),
])]);
let result = parse_xread_response(response).unwrap();
assert_eq!(result.len(), 1);
assert!(result.contains_key("stream1"));
let entries = &result["stream1"];
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id, "1234567890123-0");
assert_eq!(entries[0].get_field("field1"), Some(&"value1".to_string()));
}
#[test]
fn test_parse_xread_response_null() {
let response = RespValue::Null;
let result = parse_xread_response(response).unwrap();
assert!(result.is_empty());
}
}