use core::{fmt, time, cmp};
use core::str::FromStr;
use core::convert::TryInto;
use redis::{self, FromRedisValue, ErrorKind, Value, RedisResult, RedisError, RedisWrite, ToRedisArgs};
use crate::queue::QueueConfig;
pub(crate) mod idents {
macro_rules! define_term {
($($ident:ident),+) => {
$(
pub const $ident: &str = stringify!($ident);
)+
};
}
define_term!(TYPE, XGROUP, CREATE, MKSTREAM, BUSYGROUP, TIME, XLEN, XADD, XREADGROUP, XPENDING, XACK, XDEL);
define_term!(XINFO, GROUPS, XTRIM);
define_term!(GROUP, COUNT, BLOCK, STREAMS, IDLE, MAXLEN, MINID);
}
fn parse_redis_key(value: &redis::Value) -> Result<&str, RedisError> {
match value {
redis::Value::Data(ref data) => match core::str::from_utf8(data) {
Ok(key) => Ok(key),
Err(_) => Err((redis::ErrorKind::TypeError, "Non-UTF8 stream field's name").into()),
},
_ => Err((redis::ErrorKind::TypeError, "Invalid stream field's name").into()),
}
}
macro_rules! assign_field_if {
($field:ident = $value:ident IF $key:ident == $expected:expr) => {
if $field.is_none() && $key.eq_ignore_ascii_case($expected) {
$field = Some(FromRedisValue::from_redis_value($value)?);
continue;
}
};
}
#[cold]
#[inline(never)]
fn unlikely_redis_error(kind: redis::ErrorKind, text: &'static str) -> RedisError {
(kind, text).into()
}
macro_rules! unwrap_required_field {
($field:ident) => {
match $field {
Some(field) => field,
None => {
return Err(unlikely_redis_error(
redis::ErrorKind::TypeError,
concat!("'", stringify!($field), "' is missing"),
))
}
}
};
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(crate) enum RedisType {
String,
List,
Set,
ZSet,
Hash,
Stream,
None,
}
impl RedisType {
#[inline(always)]
pub fn parse(value: &str) -> Option<Self> {
if value.eq_ignore_ascii_case("string") {
Some(Self::String)
} else if value.eq_ignore_ascii_case("list") {
Some(Self::List)
} else if value.eq_ignore_ascii_case("set") {
Some(Self::Set)
} else if value.eq_ignore_ascii_case("zset") {
Some(Self::ZSet)
} else if value.eq_ignore_ascii_case("hash") {
Some(Self::Hash)
} else if value.eq_ignore_ascii_case("stream") {
Some(Self::Stream)
} else if value.eq_ignore_ascii_case("none") {
Some(Self::None)
} else {
None
}
}
}
impl FromRedisValue for RedisType {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Bulk(_) => Err((ErrorKind::TypeError, "Not a single value").into()),
Value::Data(value) => match core::str::from_utf8(value) {
Ok(value) => match Self::parse(value) {
Some(result) => Ok(result),
None => Err((ErrorKind::TypeError, "Not a type").into()),
},
Err(_) => Err((ErrorKind::TypeError, "Not a string").into()),
},
Value::Nil => Err((ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(response) => match Self::parse(response) {
Some(result) => Ok(result),
None => Err((ErrorKind::TypeError, "Not a type").into()),
},
}
}
fn from_byte_vec(vec: &[u8]) -> Option<Vec<Self>> {
match core::str::from_utf8(vec) {
Ok(value) => Self::parse(value).map(|val| vec![val]),
Err(_) => None,
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum TrimMethod {
MaxLen(u64),
MinId(StreamId),
}
impl ToRedisArgs for TrimMethod {
#[inline(always)]
fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
match self {
Self::MaxLen(threshold) => {
idents::MAXLEN.write_redis_args(out);
threshold.write_redis_args(out);
}
Self::MinId(id) => {
idents::MINID.write_redis_args(out);
id.write_redis_args(out);
}
}
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
false
}
}
#[derive(Debug)]
pub enum StreamIdParseError {
InvalidType,
InvalidTimestamp,
MissingSequence,
InvalidSequence,
}
impl StreamIdParseError {
#[inline(always)]
const fn as_str(&self) -> &'static str {
match self {
Self::InvalidType => "Not a valid stream id",
Self::InvalidTimestamp => "Invalid timestamp",
Self::MissingSequence => "Missing sequence part",
Self::InvalidSequence => "Invalid sequence number",
}
}
}
impl fmt::Display for StreamIdParseError {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.write_str(self.as_str())
}
}
impl std::error::Error for StreamIdParseError {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[repr(transparent)]
pub struct TimestampId {
timestamp: u64,
}
impl TimestampId {
#[inline]
pub fn new(timestamp: time::Duration) -> Self {
Self {
timestamp: match timestamp.as_millis().try_into() {
Ok(res) => res,
Err(_) => u64::max_value(),
},
}
}
}
impl fmt::Debug for TimestampId {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { timestamp } = self;
fmt::Debug::fmt(timestamp, fmt)
}
}
impl fmt::Display for TimestampId {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { timestamp } = self;
fmt.write_fmt(format_args!("{timestamp}-*"))
}
}
impl ToRedisArgs for TimestampId {
#[inline(always)]
fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
self.timestamp.write_redis_args(out)
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
true
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct StreamId {
timestamp: u64,
seq: u64,
}
impl StreamId {
#[inline(always)]
pub const fn nil() -> Self {
Self { timestamp: 0, seq: 0 }
}
#[inline(always)]
pub const fn is_nil(&self) -> bool {
self.timestamp == 0 && self.seq == 0
}
#[inline(always)]
pub const fn as_timestamp(&self) -> time::Duration {
time::Duration::from_millis(self.timestamp)
}
pub const fn next(&self) -> Self {
if self.timestamp == u64::max_value() {
Self {
timestamp: self.timestamp,
seq: self.seq.saturating_add(1),
}
} else if self.seq == u64::max_value() {
Self {
timestamp: self.timestamp.saturating_add(1),
seq: 0,
}
} else {
Self {
timestamp: self.timestamp,
seq: self.seq + 1,
}
}
}
pub const fn prev(&self) -> Self {
if self.timestamp == 0 {
Self {
timestamp: self.timestamp,
seq: self.seq.saturating_sub(1),
}
} else if self.seq == 0 {
Self {
timestamp: self.timestamp.saturating_sub(1),
seq: 0,
}
} else {
Self {
timestamp: self.timestamp,
seq: self.seq - 1,
}
}
}
}
impl fmt::Debug for StreamId {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { timestamp, seq } = self;
fmt::Debug::fmt(&(timestamp, seq), fmt)
}
}
impl PartialOrd for StreamId {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
PartialOrd::partial_cmp(&(self.timestamp, self.seq), &(other.timestamp, other.seq))
}
}
impl Ord for StreamId {
#[inline(always)]
fn cmp(&self, other: &Self) -> cmp::Ordering {
Ord::cmp(&(self.timestamp, self.seq), &(other.timestamp, other.seq))
}
}
impl FromStr for StreamId {
type Err = StreamIdParseError;
fn from_str(data: &str) -> Result<Self, Self::Err> {
let mut split = data.split('-');
let timestamp = match split.next() {
Some(timestamp) => match timestamp.parse() {
Ok(timestamp) => timestamp,
Err(_) => {
return Err(StreamIdParseError::InvalidTimestamp);
}
},
None => return Err(StreamIdParseError::InvalidType),
};
let seq = match split.next() {
Some(seq) => match seq.parse() {
Ok(seq) => seq,
Err(_) => return Err(StreamIdParseError::InvalidSequence),
},
None => return Err(StreamIdParseError::MissingSequence),
};
Ok(Self { timestamp, seq })
}
}
impl fmt::Display for StreamId {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { timestamp, seq } = self;
fmt.write_fmt(format_args!("{timestamp}-{seq}"))
}
}
impl ToRedisArgs for StreamId {
#[inline(always)]
fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
const STREAM_MAX_SIZE: usize = 20 + 1 + 20;
let mut buf = str_buf::StrBuf::<STREAM_MAX_SIZE>::new();
let _ = fmt::Write::write_fmt(&mut buf, format_args!("{self}"));
out.write_arg_fmt(buf.as_str())
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
true
}
}
impl FromRedisValue for StreamId {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Data(data) => match core::str::from_utf8(data) {
Ok(data) => match data.parse() {
Ok(result) => Ok(result),
Err(error) => Err((redis::ErrorKind::InvalidClientConfig, error.as_str()).into()),
},
Err(_) => Err((redis::ErrorKind::TypeError, "Not a string").into()),
},
Value::Bulk(_) => Err((redis::ErrorKind::TypeError, "Not bulk instead of stream id").into()),
Value::Nil => Ok(StreamId::nil()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
}
#[derive(Debug)]
pub struct GroupInfo {
pub name: String,
pub consumers: u64,
pub pending: u64,
pub last_delivered_id: StreamId,
}
impl GroupInfo {
const USER_FIELD_NAME: &str = "name";
const USER_FIELD_CONSUMERS: &str = "consumers";
const USER_FIELD_PENDING: &str = "pending";
const USER_FIELD_LAST_DELIVERED_ID: &str = "last-delivered-id";
}
impl FromRedisValue for GroupInfo {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Bulk(values) => {
let mut name = None;
let mut consumers = None;
let mut pending = None;
let mut last_delivered_id = None;
if values.len() < 8 {
return Err((
redis::ErrorKind::TypeError,
"Insufficient number of values returned. Need at least 8",
)
.into());
}
for pair in values.chunks(2) {
let key = parse_redis_key(&pair[0])?;
let value = &pair[1];
assign_field_if!(name = value IF key == Self::USER_FIELD_NAME);
assign_field_if!(consumers = value IF key == Self::USER_FIELD_CONSUMERS);
assign_field_if!(pending = value IF key == Self::USER_FIELD_PENDING);
assign_field_if!(last_delivered_id = value IF key == Self::USER_FIELD_LAST_DELIVERED_ID);
}
let name = unwrap_required_field!(name);
let consumers = unwrap_required_field!(consumers);
let pending = unwrap_required_field!(pending);
let last_delivered_id = unwrap_required_field!(last_delivered_id);
Ok(Self {
name,
consumers,
pending,
last_delivered_id,
})
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a pending field").into()),
Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
}
pub struct PendingConsumerStat {
pub name: String,
pub no_ack_num: u64,
}
impl FromRedisValue for PendingConsumerStat {
fn from_redis_value(value: &Value) -> Result<Self, RedisError> {
match value {
Value::Bulk(values) => {
if values.len() == 2 {
Ok(Self {
name: FromRedisValue::from_redis_value(&values[0])?,
no_ack_num: FromRedisValue::from_redis_value(&values[1])?,
})
} else {
Err((
redis::ErrorKind::TypeError,
"PendingConsumerStat array requires 2 elements",
)
.into())
}
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a PendingConsumerStat array").into()),
Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
}
pub struct PendingStats {
pub len: u64,
pub lowest_id: StreamId,
pub highest_id: StreamId,
pub consumers: Vec<PendingConsumerStat>,
}
impl FromRedisValue for PendingStats {
fn from_redis_value(value: &Value) -> Result<Self, RedisError> {
match value {
Value::Bulk(values) => {
if values.len() == 4 {
Ok(Self {
len: FromRedisValue::from_redis_value(&values[0])?,
lowest_id: FromRedisValue::from_redis_value(&values[1])?,
highest_id: FromRedisValue::from_redis_value(&values[2])?,
consumers: FromRedisValue::from_redis_value(&values[3])?,
})
} else {
Err((redis::ErrorKind::TypeError, "PendingStats array requires 4 elements").into())
}
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a PendingStats array").into()),
Value::Nil => Ok(Self {
len: 0,
lowest_id: StreamId::nil(),
highest_id: StreamId::nil(),
consumers: Vec::new(),
}),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct EntryValue<T> {
pub id: uuid::Uuid,
pub payload: T,
}
impl<T> EntryValue<T> {
const USER_FIELD_ID: &str = "id";
const USER_FIELD_DATA: &str = "payload";
}
impl<T: FromRedisValue> FromRedisValue for EntryValue<T> {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Bulk(values) => {
let mut id = None;
let mut payload = None;
for pair in values.chunks(2) {
let key = parse_redis_key(&pair[0])?;
let value = &pair[1];
if id.is_none() && key.eq_ignore_ascii_case(Self::USER_FIELD_ID) {
id = Some(value);
}
assign_field_if!(payload = value IF key == Self::USER_FIELD_DATA);
}
let id = match id {
Some(id) => match id {
Value::Data(data) => {
let data: [u8; 16] = match data.as_slice().try_into() {
Ok(data) => data,
Err(_) => return Err((redis::ErrorKind::TypeError, "id field is not 16 bytes").into()),
};
let data = u128::from_le_bytes(data);
uuid::Uuid::from_u128(data)
}
_ => return Err((redis::ErrorKind::TypeError, "id field is not bytes").into()),
},
None => return Err((redis::ErrorKind::TypeError, "Missing id field").into()),
};
let payload = unwrap_required_field!(payload);
Ok(Self { id, payload })
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream values").into()),
Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
}
impl<T: ToRedisArgs> ToRedisArgs for EntryValue<T> {
#[inline(always)]
fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
Self::USER_FIELD_ID.write_redis_args(out);
out.write_arg(&self.id.as_u128().to_le_bytes());
Self::USER_FIELD_DATA.write_redis_args(out);
self.payload.write_redis_args(out);
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
false
}
}
#[derive(Copy, Clone)]
pub enum RangeIdx {
Any,
Timestamp(TimestampId),
Id(StreamId),
ExcludeId(StreamId),
}
impl fmt::Debug for RangeIdx {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Any => fmt.write_str("Any"),
Self::Timestamp(time) => fmt::Debug::fmt(time, fmt),
Self::Id(id) => fmt::Debug::fmt(id, fmt),
Self::ExcludeId(id) => {
fmt.write_str("Exclude(")?;
fmt::Debug::fmt(id, fmt)?;
fmt.write_str(")")
}
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct Range {
pub start: RangeIdx,
pub end: RangeIdx,
}
impl ToRedisArgs for Range {
#[inline(always)]
fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
let Self { start, end } = self;
match start {
RangeIdx::Any => "-".write_redis_args(out),
RangeIdx::Timestamp(id) => id.write_redis_args(out),
RangeIdx::Id(id) => id.write_redis_args(out),
RangeIdx::ExcludeId(id) => id.next().write_redis_args(out),
}
match end {
RangeIdx::Any => "+".write_redis_args(out),
RangeIdx::Timestamp(id) => id.write_redis_args(out),
RangeIdx::Id(id) => id.write_redis_args(out),
RangeIdx::ExcludeId(id) => id.prev().write_redis_args(out),
}
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
false
}
}
pub struct PendingParams<'a> {
pub group: &'a str,
pub range: Range,
pub idle: Option<time::Duration>,
pub consumer: Option<&'a str>,
pub count: usize,
}
pub(crate) struct PendingParamsConfig<'a> {
pub config: &'a QueueConfig,
pub params: &'a PendingParams<'a>,
}
impl<'a> ToRedisArgs for PendingParamsConfig<'a> {
#[inline(always)]
fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
let Self { config, params } = self;
config.stream.as_ref().write_redis_args(out);
params.group.write_redis_args(out);
if let Some(idle) = params.idle {
let idle: u64 = match idle.as_millis().try_into() {
Ok(idle) => idle,
Err(_) => u64::max_value(),
};
idents::IDLE.as_bytes().write_redis_args(out);
idle.write_redis_args(out);
}
params.range.write_redis_args(out);
params.count.write_redis_args(out);
if let Some(consumer) = ¶ms.consumer {
consumer.write_redis_args(out)
}
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
false
}
}
#[derive(Debug)]
pub struct PendingEntry {
pub id: StreamId,
pub consumer: String,
pub last_delivered: time::Duration,
pub count: u64,
}
impl FromRedisValue for PendingEntry {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Bulk(values) => {
if values.len() == 4 {
Ok(Self {
id: StreamId::from_redis_value(&values[0])?,
consumer: String::from_redis_value(&values[1])?,
last_delivered: time::Duration::from_millis(u64::from_redis_value(&values[2])?),
count: u64::from_redis_value(&values[3])?,
})
} else {
Err((
redis::ErrorKind::TypeError,
"Invalid number of values in PendingEntry, should be 4",
)
.into())
}
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a pending field").into()),
Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
#[inline]
fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
None
}
}
pub enum FetchType {
New,
Pending,
After(StreamId),
}
impl ToRedisArgs for FetchType {
#[inline(always)]
fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
match self {
Self::New => out.write_arg(b">"),
Self::Pending => out.write_arg(b"0"),
Self::After(id) => id.write_redis_args(out),
}
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
true
}
}
pub struct FetchParams<'a> {
pub group: &'a str,
pub consumer: &'a str,
pub typ: FetchType,
pub count: usize,
pub timeout: Option<time::Duration>,
}
pub(crate) struct FetchParamsConfig<'a> {
pub config: &'a QueueConfig,
pub params: &'a FetchParams<'a>,
}
impl<'a> ToRedisArgs for FetchParamsConfig<'a> {
#[inline(always)]
fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
let Self { config, params } = self;
out.write_arg(idents::GROUP.as_bytes());
params.group.write_redis_args(out);
params.consumer.write_redis_args(out);
if params.count > 0 {
out.write_arg(idents::COUNT.as_bytes());
params.count.write_redis_args(out);
}
if let Some(timeout) = params.timeout {
out.write_arg(idents::BLOCK.as_bytes());
(timeout.as_millis() as u64).write_redis_args(out);
}
out.write_arg(idents::STREAMS.as_bytes());
config.stream.as_ref().write_redis_args(out);
params.typ.write_redis_args(out);
}
#[inline(always)]
fn is_single_arg(&self) -> bool {
false
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Entry<T> {
pub id: StreamId,
pub value: EntryValue<T>,
}
impl<T: FromRedisValue> FromRedisValue for Entry<T> {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Bulk(values) => {
if values.len() == 2 {
Ok(Self {
id: StreamId::from_redis_value(&values[0])?,
value: EntryValue::<T>::from_redis_value(&values[1])?,
})
} else {
Err((
redis::ErrorKind::TypeError,
"Invalid number of values in entry, should be 2",
)
.into())
}
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
None
}
}
pub struct FetchResult<T> {
pub stream: String,
pub entries: Vec<Entry<T>>,
}
impl<T: FromRedisValue> FromRedisValue for FetchResult<T> {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Bulk(values) => {
if values.len() == 2 {
Ok(Self {
stream: String::from_redis_value(&values[0])?,
entries: Vec::<Entry<T>>::from_redis_value(&values[1])?,
})
} else {
Err((redis::ErrorKind::TypeError, "Invalid number of values in entry, should be 2").into())
}
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
None
}
}
pub struct FetchEntries<T> {
pub entries: Vec<Entry<T>>,
}
impl<T: FromRedisValue> FromRedisValue for FetchEntries<T> {
fn from_redis_value(value: &Value) -> RedisResult<Self> {
match value {
Value::Bulk(values) => {
if values.len() == 2 {
Ok(Self {
entries: Vec::<Entry<T>>::from_redis_value(&values[1])?,
})
} else {
Err((redis::ErrorKind::TypeError, "Invalid number of values in entry, should be 2").into())
}
}
Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
}
}
fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
None
}
}