use crate::{
errors::{invalid_type_error, ParsingError},
from_redis_value,
types::HashMap,
FromRedisValue, RedisWrite, ToRedisArgs, Value,
};
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum StreamMaxlen {
Equals(usize),
Approx(usize),
}
impl ToRedisArgs for StreamMaxlen {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let (ch, val) = match *self {
StreamMaxlen::Equals(v) => ("=", v),
StreamMaxlen::Approx(v) => ("~", v),
};
out.write_arg(b"MAXLEN");
out.write_arg(ch.as_bytes());
val.write_redis_args(out);
}
}
#[derive(Debug)]
pub enum StreamTrimmingMode {
Exact,
Approx,
}
impl ToRedisArgs for StreamTrimmingMode {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
match self {
Self::Exact => out.write_arg(b"="),
Self::Approx => out.write_arg(b"~"),
};
}
}
#[derive(Debug)]
pub enum StreamTrimStrategy {
MaxLen(StreamTrimmingMode, usize, Option<usize>),
MinId(StreamTrimmingMode, String, Option<usize>),
}
impl StreamTrimStrategy {
pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
Self::MaxLen(trim, max_entries, None)
}
pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
Self::MinId(trim, stream_id.into(), None)
}
pub fn limit(self, limit: usize) -> Self {
match self {
StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
}
}
}
impl ToRedisArgs for StreamTrimStrategy {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let limit = match self {
StreamTrimStrategy::MaxLen(m, t, limit) => {
out.write_arg(b"MAXLEN");
m.write_redis_args(out);
t.write_redis_args(out);
limit
}
StreamTrimStrategy::MinId(m, t, limit) => {
out.write_arg(b"MINID");
m.write_redis_args(out);
t.write_redis_args(out);
limit
}
};
if let Some(limit) = limit {
out.write_arg(b"LIMIT");
limit.write_redis_args(out);
}
}
}
#[derive(Debug)]
pub struct StreamTrimOptions {
strategy: StreamTrimStrategy,
}
impl StreamTrimOptions {
pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
Self {
strategy: StreamTrimStrategy::maxlen(mode, max_entries),
}
}
pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
Self {
strategy: StreamTrimStrategy::minid(mode, stream_id),
}
}
pub fn limit(mut self, limit: usize) -> Self {
self.strategy = self.strategy.limit(limit);
self
}
}
impl ToRedisArgs for StreamTrimOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
self.strategy.write_redis_args(out);
}
}
#[derive(Default, Debug)]
pub struct StreamAddOptions {
nomkstream: bool,
trim: Option<StreamTrimStrategy>,
}
impl StreamAddOptions {
pub fn nomkstream(mut self) -> Self {
self.nomkstream = true;
self
}
pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
self.trim = Some(trim);
self
}
}
impl ToRedisArgs for StreamAddOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if self.nomkstream {
out.write_arg(b"NOMKSTREAM");
}
if let Some(strategy) = self.trim.as_ref() {
strategy.write_redis_args(out);
}
}
}
#[derive(Default, Debug)]
pub struct StreamAutoClaimOptions {
count: Option<usize>,
justid: bool,
}
impl StreamAutoClaimOptions {
pub fn count(mut self, n: usize) -> Self {
self.count = Some(n);
self
}
pub fn with_justid(mut self) -> Self {
self.justid = true;
self
}
}
impl ToRedisArgs for StreamAutoClaimOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref count) = self.count {
out.write_arg(b"COUNT");
out.write_arg(format!("{count}").as_bytes());
}
if self.justid {
out.write_arg(b"JUSTID");
}
}
}
#[derive(Default, Debug)]
pub struct StreamClaimOptions {
idle: Option<usize>,
time: Option<usize>,
retry: Option<usize>,
force: bool,
justid: bool,
lastid: Option<String>,
}
impl StreamClaimOptions {
pub fn idle(mut self, ms: usize) -> Self {
self.idle = Some(ms);
self
}
pub fn time(mut self, ms_time: usize) -> Self {
self.time = Some(ms_time);
self
}
pub fn retry(mut self, count: usize) -> Self {
self.retry = Some(count);
self
}
pub fn with_force(mut self) -> Self {
self.force = true;
self
}
pub fn with_justid(mut self) -> Self {
self.justid = true;
self
}
pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
self.lastid = Some(lastid.into());
self
}
}
impl ToRedisArgs for StreamClaimOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref ms) = self.idle {
out.write_arg(b"IDLE");
out.write_arg(format!("{ms}").as_bytes());
}
if let Some(ref ms_time) = self.time {
out.write_arg(b"TIME");
out.write_arg(format!("{ms_time}").as_bytes());
}
if let Some(ref count) = self.retry {
out.write_arg(b"RETRYCOUNT");
out.write_arg(format!("{count}").as_bytes());
}
if self.force {
out.write_arg(b"FORCE");
}
if self.justid {
out.write_arg(b"JUSTID");
}
if let Some(ref lastid) = self.lastid {
out.write_arg(b"LASTID");
lastid.write_redis_args(out);
}
}
}
type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
#[derive(Default, Debug)]
pub struct StreamReadOptions {
block: Option<usize>,
count: Option<usize>,
noack: Option<bool>,
group: SRGroup,
}
impl StreamReadOptions {
pub fn read_only(&self) -> bool {
self.group.is_none()
}
pub fn noack(mut self) -> Self {
self.noack = Some(true);
self
}
pub fn block(mut self, ms: usize) -> Self {
self.block = Some(ms);
self
}
pub fn count(mut self, n: usize) -> Self {
self.count = Some(n);
self
}
pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
mut self,
group_name: GN,
consumer_name: CN,
) -> Self {
self.group = Some((
ToRedisArgs::to_redis_args(&group_name),
ToRedisArgs::to_redis_args(&consumer_name),
));
self
}
}
impl ToRedisArgs for StreamReadOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref group) = self.group {
out.write_arg(b"GROUP");
for i in &group.0 {
out.write_arg(i);
}
for i in &group.1 {
out.write_arg(i);
}
}
if let Some(ref ms) = self.block {
out.write_arg(b"BLOCK");
out.write_arg(format!("{ms}").as_bytes());
}
if let Some(ref n) = self.count {
out.write_arg(b"COUNT");
out.write_arg(format!("{n}").as_bytes());
}
if self.group.is_some() {
if self.noack == Some(true) {
out.write_arg(b"NOACK");
}
}
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamAutoClaimReply {
pub next_stream_id: String,
pub claimed: Vec<StreamId>,
pub deleted_ids: Vec<String>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamReadReply {
pub keys: Vec<StreamKey>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamRangeReply {
pub ids: Vec<StreamId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamClaimReply {
pub ids: Vec<StreamId>,
}
#[derive(Debug, Clone, Default)]
pub enum StreamPendingReply {
#[default]
Empty,
Data(StreamPendingData),
}
impl StreamPendingReply {
pub fn count(&self) -> usize {
match self {
StreamPendingReply::Empty => 0,
StreamPendingReply::Data(x) => x.count,
}
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingData {
pub count: usize,
pub start_id: String,
pub end_id: String,
pub consumers: Vec<StreamInfoConsumer>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingCountReply {
pub ids: Vec<StreamPendingId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoStreamReply {
pub last_generated_id: String,
pub radix_tree_keys: usize,
pub groups: usize,
pub length: usize,
pub first_entry: StreamId,
pub last_entry: StreamId,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoConsumersReply {
pub consumers: Vec<StreamInfoConsumer>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoGroupsReply {
pub groups: Vec<StreamInfoGroup>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoConsumer {
pub name: String,
pub pending: usize,
pub idle: usize,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoGroup {
pub name: String,
pub consumers: usize,
pub pending: usize,
pub last_delivered_id: String,
pub entries_read: Option<usize>,
pub lag: Option<usize>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingId {
pub id: String,
pub consumer: String,
pub last_delivered_ms: usize,
pub times_delivered: usize,
}
#[derive(Default, Debug, Clone)]
pub struct StreamKey {
pub key: String,
pub ids: Vec<StreamId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamId {
pub id: String,
pub map: HashMap<String, Value>,
}
impl StreamId {
fn from_array_value(v: &Value) -> Result<Self, ParsingError> {
let mut stream_id = StreamId::default();
if let Value::Array(ref values) = *v {
if let Some(v) = values.first() {
stream_id.id = from_redis_value(v)?;
}
if let Some(v) = values.get(1) {
stream_id.map = from_redis_value(v)?;
}
}
Ok(stream_id)
}
pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
match self.map.get(key) {
Some(x) => from_redis_value(x).ok(),
None => None,
}
}
pub fn contains_key(&self, key: &str) -> bool {
self.map.contains_key(key)
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
impl FromRedisValue for StreamAutoClaimReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
match *v {
Value::Array(ref items) => {
if let 2..=3 = items.len() {
let deleted_ids = if let Some(o) = items.get(2) {
from_redis_value(o)?
} else {
Vec::new()
};
let claimed: Vec<StreamId> = match &items[1] {
Value::Array(x)
if matches!(x.first(), None | Some(Value::BulkString(_))) =>
{
let ids: Vec<String> = from_redis_value(&items[1])?;
ids.into_iter()
.map(|id| StreamId {
id,
..Default::default()
})
.collect()
}
Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
let rows: SACRows = from_redis_value(&items[1])?;
rows.into_iter()
.flat_map(|id_row| {
id_row.into_iter().map(|(id, map)| StreamId { id, map })
})
.collect()
}
_ => invalid_type_error!("Incorrect type", &items[1]),
};
Ok(Self {
next_stream_id: from_redis_value(&items[0])?,
claimed,
deleted_ids,
})
} else {
invalid_type_error!("Wrong number of entries in array response", v)
}
}
_ => invalid_type_error!("Not a array response", v),
}
}
}
type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
impl FromRedisValue for StreamReadReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let rows: SRRows = from_redis_value(v)?;
let keys = rows
.into_iter()
.flat_map(|row| {
row.into_iter().map(|(key, entry)| {
let ids = entry
.into_iter()
.flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
.collect();
StreamKey { key, ids }
})
})
.collect();
Ok(StreamReadReply { keys })
}
}
impl FromRedisValue for StreamRangeReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
let ids: Vec<StreamId> = rows
.into_iter()
.flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
.collect();
Ok(StreamRangeReply { ids })
}
}
impl FromRedisValue for StreamClaimReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
let ids: Vec<StreamId> = rows
.into_iter()
.flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
.collect();
Ok(StreamClaimReply { ids })
}
}
type SPRInner = (
usize,
Option<String>,
Option<String>,
Vec<Option<(String, String)>>,
);
impl FromRedisValue for StreamPendingReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
if count == 0 {
Ok(StreamPendingReply::Empty)
} else {
let mut result = StreamPendingData::default();
let start_id = start.ok_or_else(|| ParsingError {
description: "IllegalState: Non-zero pending expects start id".into(),
})?;
let end_id = end.ok_or_else(|| ParsingError {
description: "IllegalState: Non-zero pending expects end id".into(),
})?;
result.count = count;
result.start_id = start_id;
result.end_id = end_id;
result.consumers = consumer_data
.into_iter()
.flatten()
.map(|(name, pending)| StreamInfoConsumer {
name,
pending: pending.parse().unwrap_or_default(),
..Default::default()
})
.collect();
Ok(StreamPendingReply::Data(result))
}
}
}
impl FromRedisValue for StreamPendingCountReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let mut reply = StreamPendingCountReply::default();
match v {
Value::Array(outer_tuple) => {
for outer in outer_tuple {
match outer {
Value::Array(inner_tuple) => match &inner_tuple[..] {
[Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
{
let id = String::from_utf8(id_bytes.to_vec())?;
let consumer = String::from_utf8(consumer_bytes.to_vec())?;
let last_delivered_ms = *last_delivered_ms_u64 as usize;
let times_delivered = *times_delivered_u64 as usize;
reply.ids.push(StreamPendingId {
id,
consumer,
last_delivered_ms,
times_delivered,
});
}
_ => fail!(ParsingError {
description: "Cannot parse redis data (3)".into()
}),
},
_ => fail!(ParsingError {
description: "Cannot parse redis data (2)".into()
}),
}
}
}
_ => fail!(ParsingError {
description: "Cannot parse redis data (1)".into()
}),
};
Ok(reply)
}
}
impl FromRedisValue for StreamInfoStreamReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let map: HashMap<String, Value> = from_redis_value(v)?;
let mut reply = StreamInfoStreamReply::default();
if let Some(v) = &map.get("last-generated-id") {
reply.last_generated_id = from_redis_value(v)?;
}
if let Some(v) = &map.get("radix-tree-nodes") {
reply.radix_tree_keys = from_redis_value(v)?;
}
if let Some(v) = &map.get("groups") {
reply.groups = from_redis_value(v)?;
}
if let Some(v) = &map.get("length") {
reply.length = from_redis_value(v)?;
}
if let Some(v) = &map.get("first-entry") {
reply.first_entry = StreamId::from_array_value(v)?;
}
if let Some(v) = &map.get("last-entry") {
reply.last_entry = StreamId::from_array_value(v)?;
}
Ok(reply)
}
}
impl FromRedisValue for StreamInfoConsumersReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
let mut reply = StreamInfoConsumersReply::default();
for map in consumers {
let mut c = StreamInfoConsumer::default();
if let Some(v) = &map.get("name") {
c.name = from_redis_value(v)?;
}
if let Some(v) = &map.get("pending") {
c.pending = from_redis_value(v)?;
}
if let Some(v) = &map.get("idle") {
c.idle = from_redis_value(v)?;
}
reply.consumers.push(c);
}
Ok(reply)
}
}
impl FromRedisValue for StreamInfoGroupsReply {
fn from_redis_value(v: &Value) -> Result<Self, ParsingError> {
let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
let mut reply = StreamInfoGroupsReply::default();
for map in groups {
let mut g = StreamInfoGroup::default();
if let Some(v) = &map.get("name") {
g.name = from_redis_value(v)?;
}
if let Some(v) = &map.get("pending") {
g.pending = from_redis_value(v)?;
}
if let Some(v) = &map.get("consumers") {
g.consumers = from_redis_value(v)?;
}
if let Some(v) = &map.get("last-delivered-id") {
g.last_delivered_id = from_redis_value(v)?;
}
if let Some(v) = &map.get("entries-read") {
g.entries_read = if let Value::Nil = v {
None
} else {
Some(from_redis_value(v)?)
};
}
if let Some(v) = &map.get("lag") {
g.lag = if let Value::Nil = v {
None
} else {
Some(from_redis_value(v)?)
};
}
reply.groups.push(g);
}
Ok(reply)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
let mut out: Vec<Vec<u8>> = Vec::new();
object.write_redis_args(&mut out);
let mut cmd: Vec<u8> = Vec::new();
out.iter_mut().for_each(|item| {
cmd.append(item);
cmd.push(b' ');
});
cmd.pop();
assert_eq!(cmd, expected);
}
mod stream_auto_claim_reply {
use super::*;
use crate::Value;
#[test]
fn short_response() {
let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
StreamAutoClaimReply::from_redis_value(&value).unwrap_err();
}
#[test]
fn parses_none_claimed_response() {
let value = Value::Array(vec![
Value::BulkString("0-0".into()),
Value::Array(vec![]),
Value::Array(vec![]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(&value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "0-0");
assert_eq!(reply.claimed.len(), 0);
assert_eq!(reply.deleted_ids.len(), 0);
}
#[test]
fn parses_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("test".into()),
Value::BulkString("other".into()),
Value::BulkString("whaterver".into()),
]),
]),
Value::Array(vec![
Value::BulkString("1713465536069-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("another test".into()),
Value::BulkString("other".into()),
Value::BulkString("something".into()),
]),
]),
]),
Value::Array(vec![Value::BulkString("123456789-0".into())]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(&value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
assert!(
matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
);
assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
assert_eq!(reply.deleted_ids.len(), 1);
assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
}
#[test]
fn parses_v6_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("test".into()),
Value::BulkString("other".into()),
Value::BulkString("whaterver".into()),
]),
]),
Value::Array(vec![
Value::BulkString("1713465536069-0".into()),
Value::Array(vec![
Value::BulkString("name".into()),
Value::BulkString("another test".into()),
Value::BulkString("other".into()),
Value::BulkString("something".into()),
]),
]),
]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(&value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"1713465533411-0"));
assert!(ids.contains(&"1713465536069-0"));
assert_eq!(reply.deleted_ids.len(), 0);
}
#[test]
fn parses_justid_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::BulkString("1713465536069-0".into()),
]),
Value::Array(vec![Value::BulkString("123456789-0".into())]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(&value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"1713465533411-0"));
assert!(ids.contains(&"1713465536069-0"));
assert_eq!(reply.deleted_ids.len(), 1);
assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
}
#[test]
fn parses_v6_justid_response() {
let value = Value::Array(vec![
Value::BulkString("1713465536578-0".into()),
Value::Array(vec![
Value::BulkString("1713465533411-0".into()),
Value::BulkString("1713465536069-0".into()),
]),
]);
let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(&value).unwrap();
assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
assert_eq!(reply.claimed.len(), 2);
let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"1713465533411-0"));
assert!(ids.contains(&"1713465536069-0"));
assert_eq!(reply.deleted_ids.len(), 0);
}
}
mod stream_trim_options {
use super::*;
#[test]
fn maxlen_trim() {
let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
assert_command_eq(options, b"MAXLEN ~ 10");
}
#[test]
fn maxlen_exact_trim() {
let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
assert_command_eq(options, b"MAXLEN = 10");
}
#[test]
fn maxlen_trim_limit() {
let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
}
#[test]
fn minid_trim_limit() {
let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
}
}
mod stream_add_options {
use super::*;
#[test]
fn the_default() {
let options = StreamAddOptions::default();
assert_command_eq(options, b"");
}
#[test]
fn with_maxlen_trim() {
let options = StreamAddOptions::default()
.trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
assert_command_eq(options, b"MAXLEN = 10");
}
#[test]
fn with_nomkstream() {
let options = StreamAddOptions::default().nomkstream();
assert_command_eq(options, b"NOMKSTREAM");
}
#[test]
fn with_nomkstream_and_maxlen_trim() {
let options = StreamAddOptions::default()
.nomkstream()
.trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
}
}
}