use crate::{RedisValue, RedisResult, RedisError, RedisErrorKind, FromRedisValue, from_redis_value};
use std::num::ParseIntError;
use std::error::Error;
use std::fmt;
use std::collections::HashMap;
#[derive(Clone, PartialEq, PartialOrd)]
pub struct EntryId((u64, u64));
#[derive(Clone, PartialEq)]
pub struct StreamEntry {
pub stream: String,
pub id: EntryId,
pub values: HashMap<String, RedisValue>,
}
#[derive(Clone, PartialEq)]
pub struct RangeEntry {
pub id: EntryId,
pub values: HashMap<String, RedisValue>,
}
impl StreamEntry {
pub(crate) fn new(stream: String, id: EntryId, values: HashMap<String, RedisValue>) -> Self {
StreamEntry {
stream,
id,
values,
}
}
}
impl RangeEntry {
pub(crate) fn new(id: EntryId, values: HashMap<String, RedisValue>) -> Self {
RangeEntry {
id,
values,
}
}
}
pub(crate) fn parse_stream_entries(value: RedisValue) -> RedisResult<Vec<StreamEntry>> {
const LEN_FACTOR: usize = 1;
let streams: Vec<StreamInfo> = from_redis_value(&value)?;
let capacity = streams.len() * LEN_FACTOR;
let mut stream_entries: Vec<StreamEntry> = Vec::with_capacity(capacity);
for StreamInfo { id, entries } in streams.into_iter() {
for entry in entries.into_iter() {
let stream_entry =
StreamEntry::new(id.clone(), EntryId::from_string(entry.id)?, entry.key_values);
stream_entries.push(stream_entry);
}
}
Ok(stream_entries)
}
pub fn parse_range_entries(value: RedisValue) -> RedisResult<Vec<RangeEntry>> {
let entries: Vec<EntryInfo> = from_redis_value(&value)?;
let mut result_entries: Vec<RangeEntry> = Vec::with_capacity(entries.len());
for entry in entries.into_iter() {
let entry =
RangeEntry::new(EntryId::from_string(entry.id)?, entry.key_values);
result_entries.push(entry);
}
Ok(result_entries)
}
struct StreamInfo {
id: String,
entries: Vec<EntryInfo>,
}
#[derive(Debug)]
struct EntryInfo {
id: String,
key_values: HashMap<String, RedisValue>,
}
#[derive(Clone)]
pub enum RangeType {
Any,
GreaterThan(EntryId),
LessThan(EntryId),
GreaterLessThan(EntryId, EntryId),
}
impl RangeType {
pub fn is_valid(&self) -> bool {
match self {
RangeType::GreaterLessThan(left, right) => left < right,
_ => true
}
}
pub(crate) fn to_left_right(&self) -> (String, String) {
match self {
RangeType::Any => ("-".to_string(), "+".to_string()),
RangeType::GreaterThan(left) => (left.to_string(), "+".to_string()),
RangeType::LessThan(right) => ("-".to_string(), right.to_string()),
RangeType::GreaterLessThan(left, right) => (left.to_string(), right.to_string()),
}
}
}
impl fmt::Debug for StreamEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "(stream={}, id=\"{:?}\", {:?})", self.stream, self.id, self.values)?;
Ok(())
}
}
impl fmt::Debug for RangeEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "(id=\"{:?}\", {:?})", self.id, self.values)?;
Ok(())
}
}
impl fmt::Debug for EntryId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.to_string())?;
Ok(())
}
}
impl EntryId {
pub fn new(ms: u64, id: u64) -> EntryId {
EntryId((ms, id))
}
pub(crate) fn from_string(id: String) -> RedisResult<EntryId> {
const ENTRY_ID_CHUNK_LEN: usize = 2;
const ENTRY_ID_MS_POS: usize = 0;
const ENTRY_ID_ID_POS: usize = 1;
let tokens: Vec<&str>
= id.split('-').filter(|token| !token.is_empty()).collect();
if tokens.len() != ENTRY_ID_CHUNK_LEN {
return Err(
RedisError::new(
RedisErrorKind::ParseError,
format!("Couldn't parse a Redis entry id: {:?}", &id))
);
}
let ms = tokens[ENTRY_ID_MS_POS].parse::<u64>().map_err(&to_redis_error)?;
let id = tokens[ENTRY_ID_ID_POS].parse::<u64>().map_err(&to_redis_error)?;
Ok(Self((ms, id)))
}
pub fn to_string(&self) -> String {
format!("{}-{}", (self.0).0, (self.0).1)
}
}
fn to_redis_error(err: ParseIntError) -> RedisError {
RedisError::new(RedisErrorKind::ParseError, err.description().to_string())
}
impl FromRedisValue for EntryInfo {
fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
let (id, key_values): (String, HashMap<String, RedisValue>) = from_redis_value(value)?;
Ok(EntryInfo { id, key_values })
}
}
impl FromRedisValue for StreamInfo {
fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
let (id, entries): (String, Vec<EntryInfo>) = from_redis_value(value)?;
Ok(Self { id, entries })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn common_test_parse_stream_entry() {
let entry1 = RedisValue::Array(vec![
RedisValue::BulkString(b"1581870410019-0".to_vec()),
RedisValue::Array(vec![
RedisValue::BulkString(b"1key1".to_vec()),
RedisValue::BulkString(b"1value1".to_vec()),
RedisValue::BulkString(b"1key2".to_vec()),
RedisValue::Int(2)
])
]);
let entry2 = RedisValue::Array(vec![
RedisValue::BulkString(b"1581870414714-0".to_vec()),
RedisValue::Array(vec![
RedisValue::BulkString(b"2key1".to_vec()),
RedisValue::BulkString(b"2value1".to_vec()),
RedisValue::BulkString(b"2key2".to_vec()),
RedisValue::BulkString(b"2value2".to_vec()),
RedisValue::BulkString(b"2key3".to_vec()),
RedisValue::BulkString(b"2value3".to_vec())
])
]);
let entry3 = RedisValue::Array(vec![
RedisValue::BulkString(b"1581855076637-0".to_vec()),
RedisValue::Array(vec![
RedisValue::BulkString(b"3key1".to_vec()),
RedisValue::BulkString(b"3value1".to_vec())
])
]);
let stream1 = RedisValue::Array(vec![
RedisValue::BulkString(b"stream1".to_vec()),
RedisValue::Array(vec![
entry1,
entry2
])
]);
let stream2 = RedisValue::Array(vec![
RedisValue::BulkString(b"stream2".to_vec()),
RedisValue::Array(vec![entry3])
]);
let value = RedisValue::Array(vec![stream1, stream2]);
let result = parse_stream_entries(value).unwrap();
let mut entry1: HashMap<String, RedisValue> = HashMap::new();
entry1.insert("1key1".to_string(), RedisValue::BulkString(b"1value1".to_vec()));
entry1.insert("1key2".to_string(), RedisValue::Int(2));
let mut entry2: HashMap<String, RedisValue> = HashMap::new();
entry2.insert("2key1".to_string(), RedisValue::BulkString(b"2value1".to_vec()));
entry2.insert("2key2".to_string(), RedisValue::BulkString(b"2value2".to_vec()));
entry2.insert("2key3".to_string(), RedisValue::BulkString(b"2value3".to_vec()));
let mut entry3: HashMap<String, RedisValue> = HashMap::new();
entry3.insert("3key1".to_string(), RedisValue::BulkString(b"3value1".to_vec()));
let origin = vec![
StreamEntry::new("stream1".to_string(), EntryId((1581870410019, 0)), entry1),
StreamEntry::new("stream1".to_string(), EntryId((1581870414714, 0)), entry2),
StreamEntry::new("stream2".to_string(), EntryId((1581855076637, 0)), entry3)
];
assert_eq!(origin, result);
}
#[test]
fn test_invalid_entry_id() {
let entry = RedisValue::Array(vec![
RedisValue::BulkString(b"1581855076637x0".to_vec()),
RedisValue::Array(vec![
RedisValue::BulkString(b"key".to_vec()),
RedisValue::Int(2)
])
]);
let stream = RedisValue::Array(vec![
RedisValue::BulkString(b"stream".to_vec()),
RedisValue::Array(vec![entry])
]);
let value = RedisValue::Array(vec![stream]);
assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
}
#[test]
fn test_invalid_key_value() {
let entry = RedisValue::Array(vec![
RedisValue::BulkString(b"1581855076637-0".to_vec()),
RedisValue::Array(vec![
RedisValue::BulkString(b"key".to_vec())
])
]);
let stream = RedisValue::Array(vec![
RedisValue::BulkString(b"stream".to_vec()),
RedisValue::Array(vec![entry])
]);
let value = RedisValue::Array(vec![stream]);
assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
}
#[test]
fn test_invalid_entry_structure() {
let entry = RedisValue::Array(vec![
RedisValue::BulkString(b"1581855076637-0".to_vec())
]);
let stream = RedisValue::Array(vec![
RedisValue::BulkString(b"stream".to_vec()),
RedisValue::Array(vec![entry])
]);
let value = RedisValue::Array(vec![stream]);
assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
}
}