#[cfg(feature = "streams")]
use crate::{
FromRedisValue, RedisWrite, ToRedisArgs, Value,
errors::{ParsingError, invalid_type_error},
types::HashMap,
};
use crate::{from_redis_value, from_redis_value_ref, types::ToSingleRedisArg};
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
#[non_exhaustive]
pub enum StreamMaxlen {
Equals(usize),
Approx(usize),
}
impl ToRedisArgs for StreamMaxlen {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let (ch, val) = match *self {
StreamMaxlen::Equals(v) => ("=", v),
StreamMaxlen::Approx(v) => ("~", v),
};
out.write_arg(b"MAXLEN");
out.write_arg(ch.as_bytes());
val.write_redis_args(out);
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum StreamTrimmingMode {
Exact,
Approx,
}
impl ToRedisArgs for StreamTrimmingMode {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
match self {
Self::Exact => out.write_arg(b"="),
Self::Approx => out.write_arg(b"~"),
};
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum StreamTrimStrategy {
MaxLen(StreamTrimmingMode, usize, Option<usize>),
MinId(StreamTrimmingMode, String, Option<usize>),
}
impl StreamTrimStrategy {
pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
Self::MaxLen(trim, max_entries, None)
}
pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
Self::MinId(trim, stream_id.into(), None)
}
pub fn limit(self, limit: usize) -> Self {
match self {
StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
}
}
}
impl ToRedisArgs for StreamTrimStrategy {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let limit = match self {
StreamTrimStrategy::MaxLen(m, t, limit) => {
out.write_arg(b"MAXLEN");
m.write_redis_args(out);
t.write_redis_args(out);
limit
}
StreamTrimStrategy::MinId(m, t, limit) => {
out.write_arg(b"MINID");
m.write_redis_args(out);
t.write_redis_args(out);
limit
}
};
if let Some(limit) = limit {
out.write_arg(b"LIMIT");
limit.write_redis_args(out);
}
}
}
#[derive(Debug)]
pub struct StreamTrimOptions {
strategy: StreamTrimStrategy,
deletion_policy: Option<StreamDeletionPolicy>,
}
impl StreamTrimOptions {
pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
Self {
strategy: StreamTrimStrategy::maxlen(mode, max_entries),
deletion_policy: None,
}
}
pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
Self {
strategy: StreamTrimStrategy::minid(mode, stream_id),
deletion_policy: None,
}
}
pub fn limit(mut self, limit: usize) -> Self {
self.strategy = self.strategy.limit(limit);
self
}
pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
self.deletion_policy = Some(deletion_policy);
self
}
}
impl ToRedisArgs for StreamTrimOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
self.strategy.write_redis_args(out);
if let Some(deletion_policy) = self.deletion_policy.as_ref() {
deletion_policy.write_redis_args(out);
}
}
}
#[derive(Default, Debug)]
pub struct StreamAddOptions {
nomkstream: bool,
trim: Option<StreamTrimStrategy>,
deletion_policy: Option<StreamDeletionPolicy>,
}
impl StreamAddOptions {
pub fn nomkstream(mut self) -> Self {
self.nomkstream = true;
self
}
pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
self.trim = Some(trim);
self
}
pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
self.deletion_policy = Some(deletion_policy);
self
}
}
impl ToRedisArgs for StreamAddOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if self.nomkstream {
out.write_arg(b"NOMKSTREAM");
}
if let Some(strategy) = self.trim.as_ref() {
strategy.write_redis_args(out);
}
if let Some(deletion_policy) = self.deletion_policy.as_ref() {
deletion_policy.write_redis_args(out);
}
}
}
#[derive(Default, Debug)]
pub struct StreamAutoClaimOptions {
count: Option<usize>,
justid: bool,
}
impl StreamAutoClaimOptions {
pub fn count(mut self, n: usize) -> Self {
self.count = Some(n);
self
}
pub fn with_justid(mut self) -> Self {
self.justid = true;
self
}
}
impl ToRedisArgs for StreamAutoClaimOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref count) = self.count {
out.write_arg(b"COUNT");
out.write_arg(format!("{count}").as_bytes());
}
if self.justid {
out.write_arg(b"JUSTID");
}
}
}
#[derive(Default, Debug)]
pub struct StreamClaimOptions {
idle: Option<usize>,
time: Option<usize>,
retry: Option<usize>,
force: bool,
justid: bool,
lastid: Option<String>,
}
impl StreamClaimOptions {
pub fn idle(mut self, ms: usize) -> Self {
self.idle = Some(ms);
self
}
pub fn time(mut self, ms_time: usize) -> Self {
self.time = Some(ms_time);
self
}
pub fn retry(mut self, count: usize) -> Self {
self.retry = Some(count);
self
}
pub fn with_force(mut self) -> Self {
self.force = true;
self
}
pub fn with_justid(mut self) -> Self {
self.justid = true;
self
}
pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
self.lastid = Some(lastid.into());
self
}
}
impl ToRedisArgs for StreamClaimOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref ms) = self.idle {
out.write_arg(b"IDLE");
out.write_arg(format!("{ms}").as_bytes());
}
if let Some(ref ms_time) = self.time {
out.write_arg(b"TIME");
out.write_arg(format!("{ms_time}").as_bytes());
}
if let Some(ref count) = self.retry {
out.write_arg(b"RETRYCOUNT");
out.write_arg(format!("{count}").as_bytes());
}
if self.force {
out.write_arg(b"FORCE");
}
if self.justid {
out.write_arg(b"JUSTID");
}
if let Some(ref lastid) = self.lastid {
out.write_arg(b"LASTID");
lastid.write_redis_args(out);
}
}
}
type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
#[derive(Default, Debug)]
pub struct StreamReadOptions {
block: Option<usize>,
count: Option<usize>,
noack: Option<bool>,
group: SRGroup,
claim: Option<usize>,
}
impl StreamReadOptions {
pub fn read_only(&self) -> bool {
self.group.is_none()
}
pub fn noack(mut self) -> Self {
self.noack = Some(true);
self
}
pub fn block(mut self, ms: usize) -> Self {
self.block = Some(ms);
self
}
pub fn count(mut self, n: usize) -> Self {
self.count = Some(n);
self
}
pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
mut self,
group_name: GN,
consumer_name: CN,
) -> Self {
self.group = Some((
ToRedisArgs::to_redis_args(&group_name),
ToRedisArgs::to_redis_args(&consumer_name),
));
self
}
pub fn claim(mut self, min_idle_time: usize) -> Self {
self.claim = Some(min_idle_time);
self
}
}
impl ToRedisArgs for StreamReadOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref group) = self.group {
out.write_arg(b"GROUP");
for i in &group.0 {
out.write_arg(i);
}
for i in &group.1 {
out.write_arg(i);
}
}
if let Some(ref ms) = self.block {
out.write_arg(b"BLOCK");
out.write_arg(format!("{ms}").as_bytes());
}
if let Some(ref n) = self.count {
out.write_arg(b"COUNT");
out.write_arg(format!("{n}").as_bytes());
}
if self.group.is_some() {
if self.noack == Some(true) {
out.write_arg(b"NOACK");
}
if let Some(ref min_idle_time) = self.claim {
out.write_arg(b"CLAIM");
out.write_arg(format!("{min_idle_time}").as_bytes());
}
}
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamAutoClaimReply {
pub next_stream_id: String,
pub claimed: Vec<StreamId>,
pub deleted_ids: Vec<String>,
pub invalid_entries: bool,
}
#[derive(Default, Debug, Clone)]
pub struct StreamReadReply {
pub keys: Vec<StreamKey>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamRangeReply {
pub ids: Vec<StreamId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamClaimReply {
pub ids: Vec<StreamId>,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub enum StreamPendingReply {
#[default]
Empty,
Data(StreamPendingData),
}
impl StreamPendingReply {
pub fn count(&self) -> usize {
match self {
StreamPendingReply::Empty => 0,
StreamPendingReply::Data(x) => x.count,
}
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingData {
pub count: usize,
pub start_id: String,
pub end_id: String,
pub consumers: Vec<StreamInfoConsumer>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingCountReply {
pub ids: Vec<StreamPendingId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoStreamReply {
pub last_generated_id: String,
pub radix_tree_keys: usize,
pub groups: usize,
pub length: usize,
pub first_entry: StreamId,
pub last_entry: StreamId,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoConsumersReply {
pub consumers: Vec<StreamInfoConsumer>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoGroupsReply {
pub groups: Vec<StreamInfoGroup>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoConsumer {
pub name: String,
pub pending: usize,
pub idle: usize,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoGroup {
pub name: String,
pub consumers: usize,
pub pending: usize,
pub last_delivered_id: String,
pub entries_read: Option<usize>,
pub lag: Option<usize>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingId {
pub id: String,
pub consumer: String,
pub last_delivered_ms: usize,
pub times_delivered: usize,
}
#[derive(Default, Debug, Clone)]
pub struct StreamKey {
pub key: String,
pub ids: Vec<StreamId>,
}
#[derive(Default, Debug, Clone, PartialEq)]
pub struct StreamId {
pub id: String,
pub map: HashMap<String, Value>,
pub milliseconds_elapsed_from_delivery: Option<usize>,
pub delivered_count: Option<usize>,
}
impl StreamId {
fn from_array_value(v: Value) -> Result<Self, ParsingError> {
let mut stream_id = StreamId::default();
if let Value::Array(mut values) = v {
if let Some(v) = values.first_mut() {
stream_id.id = from_redis_value(std::mem::take(v))?;
}
if let Some(v) = values.first_mut() {
stream_id.map = from_redis_value(std::mem::take(v))?;
}
}
Ok(stream_id)
}
pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
match self.map.get(key) {
Some(x) => from_redis_value_ref(x).ok(),
None => None,
}
}
pub fn contains_key(&self, key: &str) -> bool {
self.map.contains_key(key)
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
impl FromRedisValue for StreamAutoClaimReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let Value::Array(mut items) = v else {
invalid_type_error!("Not a array response", v);
};
if items.len() > 3 || items.len() < 2 {
invalid_type_error!("Incorrect number of items", &items);
}
let deleted_ids = if items.len() == 3 {
from_redis_value(items.pop().unwrap())?
} else {
Vec::new()
};
let claimed = items.pop().unwrap();
let next_stream_id = from_redis_value(items.pop().unwrap())?;
let Value::Array(arr) = &claimed else {
invalid_type_error!("Incorrect type", claimed)
};
let Some(entry) = arr.iter().find(|val| !matches!(val, Value::Nil)) else {
return Ok(Self {
next_stream_id,
claimed: Vec::new(),
deleted_ids,
invalid_entries: !arr.is_empty(),
});
};
let (claimed, invalid_entries) = match entry {
Value::BulkString(_) => {
let claimed_count = arr.len();
let ids: Vec<Option<String>> = from_redis_value(claimed)?;
let claimed: Vec<_> = ids
.into_iter()
.filter_map(|id| {
id.map(|id| StreamId {
id,
..Default::default()
})
})
.collect();
let invalid_entries = claimed.len() < claimed_count;
(claimed, invalid_entries)
}
Value::Array(_) => {
let claimed_count = arr.len();
let rows: SACRows = from_redis_value(claimed)?;
let claimed: Vec<_> = rows
.into_iter()
.flat_map(|row| {
row.into_iter().map(|(id, map)| StreamId {
id,
map,
milliseconds_elapsed_from_delivery: None,
delivered_count: None,
})
})
.collect();
let invalid_entries = claimed.len() < claimed_count;
(claimed, invalid_entries)
}
_ => invalid_type_error!("Incorrect type", claimed),
};
Ok(Self {
next_stream_id,
claimed,
deleted_ids,
invalid_entries,
})
}
}
type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
type SRClaimRows =
Vec<HashMap<String, Vec<(String, HashMap<String, Value>, Option<usize>, Option<usize>)>>>;
impl FromRedisValue for StreamReadReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
if let Ok(rows) = from_redis_value::<SRRows>(v.clone()) {
return Ok(Self::from_standard_rows(rows));
}
if let Ok(rows) = from_redis_value::<SRClaimRows>(v.clone()) {
return Ok(Self::from_claim_rows(rows));
}
invalid_type_error!("Could not parse StreamReadReply in any known format", v)
}
}
impl StreamReadReply {
fn from_standard_rows(rows: SRRows) -> Self {
let keys = rows
.into_iter()
.flat_map(|row| {
row.into_iter().map(|(key, entries)| StreamKey {
key,
ids: entries
.into_iter()
.flat_map(|id_row| {
id_row.into_iter().map(|(id, map)| StreamId {
id,
map,
milliseconds_elapsed_from_delivery: None,
delivered_count: None,
})
})
.collect(),
})
})
.collect();
StreamReadReply { keys }
}
fn from_claim_rows(rows: SRClaimRows) -> Self {
let keys = rows
.into_iter()
.flat_map(|row| {
row.into_iter().map(|(key, entries)| StreamKey {
key,
ids: entries
.into_iter()
.map(
|(id, map, milliseconds_elapsed_from_delivery, delivered_count)| {
StreamId {
id,
map,
milliseconds_elapsed_from_delivery,
delivered_count,
}
},
)
.collect(),
})
})
.collect();
StreamReadReply { keys }
}
}
impl FromRedisValue for StreamRangeReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
let ids: Vec<StreamId> = rows
.into_iter()
.flat_map(|row| {
row.into_iter().map(|(id, map)| StreamId {
id,
map,
milliseconds_elapsed_from_delivery: None,
delivered_count: None,
})
})
.collect();
Ok(StreamRangeReply { ids })
}
}
impl FromRedisValue for StreamClaimReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
let ids: Vec<StreamId> = rows
.into_iter()
.flat_map(|row| {
row.into_iter().map(|(id, map)| StreamId {
id,
map,
milliseconds_elapsed_from_delivery: None,
delivered_count: None,
})
})
.collect();
Ok(StreamClaimReply { ids })
}
}
type SPRInner = (
usize,
Option<String>,
Option<String>,
Vec<Option<(String, String)>>,
);
impl FromRedisValue for StreamPendingReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
if count == 0 {
Ok(StreamPendingReply::Empty)
} else {
let mut result = StreamPendingData::default();
let start_id = start.ok_or_else(|| {
ParsingError::from(arcstr::literal!(
"IllegalState: Non-zero pending expects start id"
))
})?;
let end_id = end.ok_or_else(|| {
ParsingError::from(arcstr::literal!(
"IllegalState: Non-zero pending expects end id"
))
})?;
result.count = count;
result.start_id = start_id;
result.end_id = end_id;
result.consumers = consumer_data
.into_iter()
.flatten()
.map(|(name, pending)| StreamInfoConsumer {
name,
pending: pending.parse().unwrap_or_default(),
..Default::default()
})
.collect();
Ok(StreamPendingReply::Data(result))
}
}
}
impl FromRedisValue for StreamPendingCountReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let mut reply = StreamPendingCountReply::default();
match v {
Value::Array(outer_tuple) => {
for outer in outer_tuple {
match outer {
Value::Array(inner_tuple) => match &inner_tuple[..] {
[
Value::BulkString(id_bytes),
Value::BulkString(consumer_bytes),
Value::Int(last_delivered_ms_u64),
Value::Int(times_delivered_u64),
] => {
let id = String::from_utf8(id_bytes.to_vec())?;
let consumer = String::from_utf8(consumer_bytes.to_vec())?;
let last_delivered_ms = *last_delivered_ms_u64 as usize;
let times_delivered = *times_delivered_u64 as usize;
reply.ids.push(StreamPendingId {
id,
consumer,
last_delivered_ms,
times_delivered,
});
}
_ => fail!(ParsingError::from(arcstr::literal!(
"Cannot parse redis data (3)"
))),
},
_ => fail!(ParsingError::from(arcstr::literal!(
"Cannot parse redis data (2)"
))),
}
}
}
_ => fail!(ParsingError::from(arcstr::literal!(
"Cannot parse redis data (1)"
))),
};
Ok(reply)
}
}
impl FromRedisValue for StreamInfoStreamReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let mut map: HashMap<String, Value> = from_redis_value(v)?;
let mut reply = StreamInfoStreamReply::default();
if let Some(v) = map.remove("last-generated-id") {
reply.last_generated_id = from_redis_value(v)?;
}
if let Some(v) = map.remove("radix-tree-nodes") {
reply.radix_tree_keys = from_redis_value(v)?;
}
if let Some(v) = map.remove("groups") {
reply.groups = from_redis_value(v)?;
}
if let Some(v) = map.remove("length") {
reply.length = from_redis_value(v)?;
}
if let Some(v) = map.remove("first-entry") {
reply.first_entry = StreamId::from_array_value(v)?;
}
if let Some(v) = map.remove("last-entry") {
reply.last_entry = StreamId::from_array_value(v)?;
}
Ok(reply)
}
}
impl FromRedisValue for StreamInfoConsumersReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
let mut reply = StreamInfoConsumersReply::default();
for mut map in consumers {
let mut c = StreamInfoConsumer::default();
if let Some(v) = map.remove("name") {
c.name = from_redis_value(v)?;
}
if let Some(v) = map.remove("pending") {
c.pending = from_redis_value(v)?;
}
if let Some(v) = map.remove("idle") {
c.idle = from_redis_value(v)?;
}
reply.consumers.push(c);
}
Ok(reply)
}
}
impl FromRedisValue for StreamInfoGroupsReply {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
let mut reply = StreamInfoGroupsReply::default();
for mut map in groups {
let mut g = StreamInfoGroup::default();
if let Some(v) = map.remove("name") {
g.name = from_redis_value(v)?;
}
if let Some(v) = map.remove("pending") {
g.pending = from_redis_value(v)?;
}
if let Some(v) = map.remove("consumers") {
g.consumers = from_redis_value(v)?;
}
if let Some(v) = map.remove("last-delivered-id") {
g.last_delivered_id = from_redis_value(v)?;
}
if let Some(v) = map.remove("entries-read") {
g.entries_read = if let Value::Nil = v {
None
} else {
Some(from_redis_value(v)?)
};
}
if let Some(v) = map.remove("lag") {
g.lag = if let Value::Nil = v {
None
} else {
Some(from_redis_value(v)?)
};
}
reply.groups.push(g);
}
Ok(reply)
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub enum StreamDeletionPolicy {
#[default]
KeepRef,
DelRef,
Acked,
}
impl ToRedisArgs for StreamDeletionPolicy {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
match self {
StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
}
}
}
impl ToSingleRedisArg for StreamDeletionPolicy {}
#[cfg(feature = "streams")]
#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
#[derive(Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum XDelExStatusCode {
IdNotFound = -1,
Deleted = 1,
NotDeletedUnacknowledgedOrStillReferenced = 2,
}
#[cfg(feature = "streams")]
impl FromRedisValue for XDelExStatusCode {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
match v {
Value::Int(code) => match code {
-1 => Ok(XDelExStatusCode::IdNotFound),
1 => Ok(XDelExStatusCode::Deleted),
2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
_ => Err(format!("Invalid XDelExStatusCode status code: {code}").into()),
},
_ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
}
}
}
#[cfg(feature = "streams")]
#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
#[derive(Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum XAckDelStatusCode {
IdNotFound = -1,
AcknowledgedAndDeleted = 1,
AcknowledgedNotDeletedStillReferenced = 2,
}
#[cfg(feature = "streams")]
impl FromRedisValue for XAckDelStatusCode {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
match v {
Value::Int(code) => match code {
-1 => Ok(XAckDelStatusCode::IdNotFound),
1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
_ => Err(arcstr::literal!("Invalid XAckDelStatusCode status code: {code}").into()),
},
_ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
let mut out: Vec<Vec<u8>> = Vec::new();
object.write_redis_args(&mut out);
let mut cmd: Vec<u8> = Vec::new();
out.iter_mut().for_each(|item| {
cmd.append(item);
cmd.push(b' ');
});
cmd.pop();
assert_eq!(cmd, expected);
}
mod stream_auto_claim_reply {
use super::*;
use crate::Value;
#[test]
fn short_response() {
let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
StreamAutoClaimReply::from_redis_value(value).unwrap_err();
}
#[test]
fn parses_none_claimed_response() {
let value = Value::Array(vec![
Value::BulkString("0-0".into()),
Value::Array(vec![]),
Value::Array(vec![]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "0-0");
assert_eq!(reply.claimed.len(), 0);
assert_eq!(reply.deleted_ids.len(), 0);
}
#[test]
fn parses_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("test".into()),
Value::BulkString("other".into()),
Value::BulkString("whaterver".into()),
]),
]),
Value::Array(vec![
Value::BulkString("1713465536069-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("another test".into()),
Value::BulkString("other".into()),
Value::BulkString("something".into()),
]),
]),
]),
Value::Array(vec![Value::BulkString("123456789-0".into())]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
assert!(
matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
);
assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
assert_eq!(reply.deleted_ids.len(), 1);
assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
}
#[test]
fn parses_v6_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("test".into()),
Value::BulkString("other".into()),
Value::BulkString("whaterver".into()),
]),
]),
Value::Array(vec![
Value::BulkString("1713465536069-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("another test".into()),
Value::BulkString("other".into()),
Value::BulkString("something".into()),
]),
]),
]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"1713465533411-0"));
assert!(ids.contains(&"1713465536069-0"));
assert_eq!(reply.deleted_ids.len(), 0);
}
#[test]
fn parses_justid_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::BulkString("1713465536069-0".into()),
]),
Value::Array(vec![Value::BulkString("123456789-0".into())]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"1713465533411-0"));
assert!(ids.contains(&"1713465536069-0"));
assert_eq!(reply.deleted_ids.len(), 1);
assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
}
#[test]
fn parses_v6_justid_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::BulkString("1713465536069-0".into()),
]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"1713465533411-0"));
assert!(ids.contains(&"1713465536069-0"));
assert_eq!(reply.deleted_ids.len(), 0);
}
}
mod stream_trim_options {
use super::*;
#[test]
fn maxlen_trim() {
let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
assert_command_eq(options, b"MAXLEN ~ 10");
}
#[test]
fn maxlen_exact_trim() {
let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
assert_command_eq(options, b"MAXLEN = 10");
}
#[test]
fn maxlen_trim_limit() {
let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
}
#[test]
fn minid_trim_limit() {
let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
}
}
mod stream_add_options {
use super::*;
#[test]
fn the_default() {
let options = StreamAddOptions::default();
assert_command_eq(options, b"");
}
#[test]
fn with_maxlen_trim() {
let options = StreamAddOptions::default()
.trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
assert_command_eq(options, b"MAXLEN = 10");
}
#[test]
fn with_nomkstream() {
let options = StreamAddOptions::default().nomkstream();
assert_command_eq(options, b"NOMKSTREAM");
}
#[test]
fn with_nomkstream_and_maxlen_trim() {
let options = StreamAddOptions::default()
.nomkstream()
.trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
}
}
}