use std::collections::HashMap;
use crate::{
prepare_command,
resp::{
cmd, CommandArg, CommandArgs, FromKeyValueValueArray, FromValue, HashMapExt, IntoArgs,
KeyValueArgOrCollection, SingleArgOrCollection, Value,
},
PreparedCommand, Result,
};
pub trait StreamCommands {
fn xack<K, G, I, II>(&mut self, key: K, group: G, ids: II) -> PreparedCommand<Self, usize>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
I: Into<CommandArg>,
II: SingleArgOrCollection<I>,
{
prepare_command(self, cmd("XACK").arg(key).arg(group).arg(ids))
}
fn xadd<K, I, F, V, FFVV, R>(
&mut self,
key: K,
stream_id: I,
items: FFVV,
options: XAddOptions,
) -> PreparedCommand<Self, R>
where
Self: Sized,
K: Into<CommandArg>,
I: Into<CommandArg>,
F: Into<CommandArg>,
V: Into<CommandArg>,
FFVV: KeyValueArgOrCollection<F, V>,
R: FromValue,
{
prepare_command(
self,
cmd("XADD").arg(key).arg(options).arg(stream_id).arg(items),
)
}
fn xautoclaim<K, G, C, I, V>(
&mut self,
key: K,
group: G,
consumer: C,
min_idle_time: u64,
start: I,
options: XAutoClaimOptions,
) -> PreparedCommand<Self, XAutoClaimResult<V>>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
C: Into<CommandArg>,
I: Into<CommandArg>,
V: FromValue,
{
prepare_command(
self,
cmd("XAUTOCLAIM")
.arg(key)
.arg(group)
.arg(consumer)
.arg(min_idle_time)
.arg(start)
.arg(options),
)
}
fn xclaim<K, G, C, I, II, V>(
&mut self,
key: K,
group: G,
consumer: C,
min_idle_time: u64,
ids: II,
options: XClaimOptions,
) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
C: Into<CommandArg>,
I: Into<CommandArg>,
II: SingleArgOrCollection<I>,
V: FromValue,
{
prepare_command(
self,
cmd("XCLAIM")
.arg(key)
.arg(group)
.arg(consumer)
.arg(min_idle_time)
.arg(ids)
.arg(options),
)
}
fn xdel<K, I, II>(&mut self, key: K, ids: II) -> PreparedCommand<Self, usize>
where
Self: Sized,
K: Into<CommandArg>,
I: Into<CommandArg>,
II: SingleArgOrCollection<I>,
{
prepare_command(self, cmd("XDEL").arg(key).arg(ids))
}
fn xgroup_create<K, G, I>(
&mut self,
key: K,
groupname: G,
id: I,
options: XGroupCreateOptions,
) -> PreparedCommand<Self, bool>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
I: Into<CommandArg>,
{
prepare_command(
self,
cmd("XGROUP")
.arg("CREATE")
.arg(key)
.arg(groupname)
.arg(id)
.arg(options),
)
}
fn xgroup_createconsumer<K, G, C>(
&mut self,
key: K,
groupname: G,
consumername: C,
) -> PreparedCommand<Self, bool>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
C: Into<CommandArg>,
{
prepare_command(
self,
cmd("XGROUP")
.arg("CREATECONSUMER")
.arg(key)
.arg(groupname)
.arg(consumername),
)
}
fn xgroup_delconsumer<K, G, C>(
&mut self,
key: K,
groupname: G,
consumername: C,
) -> PreparedCommand<Self, usize>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
C: Into<CommandArg>,
{
prepare_command(
self,
cmd("XGROUP")
.arg("DELCONSUMER")
.arg(key)
.arg(groupname)
.arg(consumername),
)
}
fn xgroup_destroy<K, G>(&mut self, key: K, groupname: G) -> PreparedCommand<Self, bool>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
{
prepare_command(self, cmd("XGROUP").arg("DESTROY").arg(key).arg(groupname))
}
fn xgroup_setid<K, G, I>(
&mut self,
key: K,
groupname: G,
id: I,
entries_read: Option<usize>,
) -> PreparedCommand<Self, ()>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
I: Into<CommandArg>,
{
prepare_command(
self,
cmd("XGROUP")
.arg("SETID")
.arg(key)
.arg(groupname)
.arg(id)
.arg(entries_read.map(|e| ("ENTRIESREAD", e))),
)
}
fn xinfo_consumers<K, G>(
&mut self,
key: K,
groupname: G,
) -> PreparedCommand<Self, Vec<XConsumerInfo>>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
{
prepare_command(self, cmd("XINFO").arg("CONSUMERS").arg(key).arg(groupname))
}
fn xinfo_groups<K>(&mut self, key: K) -> PreparedCommand<Self, Vec<XGroupInfo>>
where
Self: Sized,
K: Into<CommandArg>,
{
prepare_command(self, cmd("XINFO").arg("GROUPS").arg(key))
}
fn xinfo_stream<K>(
&mut self,
key: K,
options: XInfoStreamOptions,
) -> PreparedCommand<Self, XStreamInfo>
where
Self: Sized,
K: Into<CommandArg>,
{
prepare_command(self, cmd("XINFO").arg("STREAM").arg(key).arg(options))
}
fn xlen<K>(&mut self, key: K) -> PreparedCommand<Self, usize>
where
Self: Sized,
K: Into<CommandArg>,
{
prepare_command(self, cmd("XLEN").arg(key))
}
fn xpending<K, G>(&mut self, key: K, group: G) -> PreparedCommand<Self, XPendingResult>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
{
prepare_command(self, cmd("XPENDING").arg(key).arg(group))
}
fn xpending_with_options<K, G>(
&mut self,
key: K,
group: G,
options: XPendingOptions,
) -> PreparedCommand<Self, Vec<XPendingMessageResult>>
where
Self: Sized,
K: Into<CommandArg>,
G: Into<CommandArg>,
{
prepare_command(self, cmd("XPENDING").arg(key).arg(group).arg(options))
}
fn xrange<K, S, E, V>(
&mut self,
key: K,
start: S,
end: E,
count: Option<usize>,
) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
where
Self: Sized,
K: Into<CommandArg>,
S: Into<CommandArg>,
E: Into<CommandArg>,
V: FromValue,
{
prepare_command(
self,
cmd("XRANGE")
.arg(key)
.arg(start)
.arg(end)
.arg(count.map(|c| ("COUNT", c))),
)
}
fn xread<K, KK, I, II, V, R>(
&mut self,
options: XReadOptions,
keys: KK,
ids: II,
) -> PreparedCommand<Self, R>
where
Self: Sized,
K: Into<CommandArg>,
KK: SingleArgOrCollection<K>,
I: Into<CommandArg>,
II: SingleArgOrCollection<I>,
V: FromValue,
R: FromKeyValueValueArray<String, Vec<StreamEntry<V>>>,
{
prepare_command(
self,
cmd("XREAD").arg(options).arg("STREAMS").arg(keys).arg(ids),
)
}
fn xreadgroup<G, C, K, KK, I, II, V, R>(
&mut self,
group: G,
consumer: C,
options: XReadGroupOptions,
keys: KK,
ids: II,
) -> PreparedCommand<Self, R>
where
Self: Sized,
G: Into<CommandArg>,
C: Into<CommandArg>,
K: Into<CommandArg>,
KK: SingleArgOrCollection<K>,
I: Into<CommandArg>,
II: SingleArgOrCollection<I>,
V: FromValue,
R: FromKeyValueValueArray<String, Vec<StreamEntry<V>>>,
{
prepare_command(
self,
cmd("XREADGROUP")
.arg("GROUP")
.arg(group)
.arg(consumer)
.arg(options)
.arg("STREAMS")
.arg(keys)
.arg(ids),
)
}
fn xrevrange<K, E, S, V>(
&mut self,
key: K,
end: E,
start: S,
count: Option<usize>,
) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
where
Self: Sized,
K: Into<CommandArg>,
E: Into<CommandArg>,
S: Into<CommandArg>,
V: FromValue,
{
prepare_command(
self,
cmd("XREVRANGE")
.arg(key)
.arg(end)
.arg(start)
.arg(count.map(|c| ("COUNT", c))),
)
}
fn xtrim<K>(&mut self, key: K, options: XTrimOptions) -> PreparedCommand<Self, usize>
where
Self: Sized,
K: Into<CommandArg>,
{
prepare_command(self, cmd("XTRIM").arg(key).arg(options))
}
}
#[derive(Default)]
pub struct XAddOptions {
command_args: CommandArgs,
}
impl XAddOptions {
#[must_use]
pub fn no_mk_stream(self) -> Self {
Self {
command_args: self.command_args.arg("NOMKSTREAM"),
}
}
#[must_use]
pub fn trim_options(self, trim_options: XTrimOptions) -> Self {
Self {
command_args: self.command_args.arg(trim_options),
}
}
}
impl IntoArgs for XAddOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
pub enum XTrimOperator {
None,
Equal,
Approximately,
}
impl IntoArgs for XTrimOperator {
fn into_args(self, args: CommandArgs) -> CommandArgs {
match self {
XTrimOperator::None => args,
XTrimOperator::Equal => args.arg(CommandArg::Str("=")),
XTrimOperator::Approximately => args.arg(CommandArg::Str("~")),
}
}
}
impl Default for XTrimOperator {
fn default() -> Self {
XTrimOperator::None
}
}
#[derive(Default)]
pub struct XTrimOptions {
command_args: CommandArgs,
}
impl XTrimOptions {
#[must_use]
pub fn max_len(operator: XTrimOperator, threshold: i64) -> Self {
Self {
command_args: CommandArgs::default()
.arg("MAXLEN")
.arg(operator)
.arg(threshold),
}
}
#[must_use]
pub fn min_id<I: Into<CommandArg>>(operator: XTrimOperator, threshold_id: I) -> Self {
Self {
command_args: CommandArgs::default()
.arg("MINID")
.arg(operator)
.arg(threshold_id),
}
}
#[must_use]
pub fn limit(self, count: usize) -> Self {
Self {
command_args: self.command_args.arg("LIMIT").arg(count),
}
}
}
impl IntoArgs for XTrimOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
#[derive(Default)]
pub struct XAutoClaimOptions {
command_args: CommandArgs,
}
impl XAutoClaimOptions {
#[must_use]
pub fn count(self, count: usize) -> Self {
Self {
command_args: self.command_args.arg("COUNT").arg(count),
}
}
#[must_use]
pub fn just_id(self) -> Self {
Self {
command_args: self.command_args.arg("JUSTID"),
}
}
}
impl IntoArgs for XAutoClaimOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
pub struct StreamEntry<V>
where
V: FromValue,
{
pub stream_id: String,
pub items: HashMap<String, V>,
}
impl<V> FromValue for StreamEntry<V>
where
V: FromValue,
{
fn from_value(value: Value) -> Result<Self> {
let (stream_id, items): (String, HashMap<String, V>) = value.into()?;
Ok(Self { stream_id, items })
}
}
pub struct XAutoClaimResult<V>
where
V: FromValue,
{
pub start_stream_id: String,
pub entries: Vec<StreamEntry<V>>,
pub deleted_id: Vec<String>,
}
impl<V> FromValue for XAutoClaimResult<V>
where
V: FromValue,
{
fn from_value(value: Value) -> Result<Self> {
let (start_stream_id, entries, deleted_id): (String, Vec<StreamEntry<V>>, Vec<String>) =
value.into()?;
Ok(Self {
start_stream_id,
entries,
deleted_id,
})
}
}
#[derive(Default)]
pub struct XClaimOptions {
command_args: CommandArgs,
}
impl XClaimOptions {
#[must_use]
pub fn idle_time(self, idle_time_millis: u64) -> Self {
Self {
command_args: self.command_args.arg("IDLE").arg(idle_time_millis),
}
}
#[must_use]
pub fn time(self, unix_time_milliseconds: u64) -> Self {
Self {
command_args: self.command_args.arg("TIME").arg(unix_time_milliseconds),
}
}
#[must_use]
pub fn retry_count(self, count: usize) -> Self {
Self {
command_args: self.command_args.arg("RETRYCOUNT").arg(count),
}
}
#[must_use]
pub fn force(self) -> Self {
Self {
command_args: self.command_args.arg("FORCE"),
}
}
#[must_use]
pub fn just_id(self) -> Self {
Self {
command_args: self.command_args.arg("JUSTID"),
}
}
}
impl IntoArgs for XClaimOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
#[derive(Default)]
pub struct XGroupCreateOptions {
command_args: CommandArgs,
}
impl XGroupCreateOptions {
#[must_use]
pub fn mk_stream(self) -> Self {
Self {
command_args: self.command_args.arg("MKSTREAM"),
}
}
#[must_use]
pub fn entries_read(self, entries_read: usize) -> Self {
Self {
command_args: self.command_args.arg("ENTRIESREAD").arg(entries_read),
}
}
}
impl IntoArgs for XGroupCreateOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
pub struct XConsumerInfo {
pub name: String,
pub pending: usize,
pub idle_millis: u64,
}
impl FromValue for XConsumerInfo {
fn from_value(value: Value) -> Result<Self> {
let mut values: HashMap<String, Value> = value.into()?;
Ok(Self {
name: values.remove_with_result("name")?.into()?,
pending: values.remove_with_result("pending")?.into()?,
idle_millis: values.remove_with_result("idle")?.into()?,
})
}
}
pub struct XGroupInfo {
pub name: String,
pub consumers: usize,
pub pending: usize,
pub last_delivered_id: String,
pub entries_read: Option<usize>,
pub lag: Option<usize>,
}
impl FromValue for XGroupInfo {
fn from_value(value: Value) -> Result<Self> {
let mut values: HashMap<String, Value> = value.into()?;
Ok(Self {
name: values.remove_with_result("name")?.into()?,
consumers: values.remove_with_result("consumers")?.into()?,
pending: values.remove_with_result("pending")?.into()?,
last_delivered_id: values.remove_with_result("last-delivered-id")?.into()?,
entries_read: values.remove_with_result("entries-read")?.into()?,
lag: values.remove_with_result("lag")?.into()?,
})
}
}
#[derive(Default)]
pub struct XInfoStreamOptions {
command_args: CommandArgs,
}
impl XInfoStreamOptions {
#[must_use]
pub fn full(self) -> Self {
Self {
command_args: self.command_args.arg("FULL"),
}
}
#[must_use]
pub fn count(self, count: usize) -> Self {
Self {
command_args: self.command_args.arg("COUNT").arg(count),
}
}
}
impl IntoArgs for XInfoStreamOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
pub struct XStreamInfo {
pub length: usize,
pub radix_tree_keys: usize,
pub radix_tree_nodes: usize,
pub groups: usize,
pub last_generated_id: String,
pub max_deleted_entry_id: String,
pub entries_added: usize,
pub first_entry: StreamEntry<String>,
pub last_entry: StreamEntry<String>,
pub recorded_first_entry_id: String,
}
impl FromValue for XStreamInfo {
fn from_value(value: Value) -> Result<Self> {
let mut values: HashMap<String, Value> = value.into()?;
Ok(Self {
length: values.remove_with_result("length")?.into()?,
radix_tree_keys: values.remove_with_result("radix-tree-keys")?.into()?,
radix_tree_nodes: values.remove_with_result("radix-tree-nodes")?.into()?,
groups: values.remove_with_result("groups")?.into()?,
last_generated_id: values.remove_with_result("last-generated-id")?.into()?,
max_deleted_entry_id: values.remove_with_result("max-deleted-entry-id")?.into()?,
entries_added: values.remove_with_result("entries-added")?.into()?,
first_entry: values.remove_with_result("first-entry")?.into()?,
last_entry: values.remove_with_result("last-entry")?.into()?,
recorded_first_entry_id: values
.remove_with_result("recorded-first-entry-id")?
.into()?,
})
}
}
#[derive(Default)]
pub struct XReadOptions {
command_args: CommandArgs,
}
impl XReadOptions {
#[must_use]
pub fn count(self, count: usize) -> Self {
Self {
command_args: self.command_args.arg("COUNT").arg(count),
}
}
#[must_use]
pub fn block(self, milliseconds: u64) -> Self {
Self {
command_args: self.command_args.arg("BLOCK").arg(milliseconds),
}
}
}
impl IntoArgs for XReadOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
#[derive(Default)]
pub struct XReadGroupOptions {
command_args: CommandArgs,
}
impl XReadGroupOptions {
#[must_use]
pub fn count(self, count: usize) -> Self {
Self {
command_args: self.command_args.arg("COUNT").arg(count),
}
}
#[must_use]
pub fn block(self, milliseconds: u64) -> Self {
Self {
command_args: self.command_args.arg("BLOCK").arg(milliseconds),
}
}
#[must_use]
pub fn no_ack(self) -> Self {
Self {
command_args: self.command_args.arg("NOACK"),
}
}
}
impl IntoArgs for XReadGroupOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
#[derive(Default)]
pub struct XPendingOptions {
command_args: CommandArgs,
}
impl XPendingOptions {
#[must_use]
pub fn idle(self, min_idle_time: u64) -> Self {
Self {
command_args: self.command_args.arg("IDLE").arg(min_idle_time),
}
}
#[must_use]
pub fn start<S: Into<CommandArg>>(self, start: S) -> Self {
Self {
command_args: self.command_args.arg(start),
}
}
#[must_use]
pub fn end<E: Into<CommandArg>>(self, end: E) -> Self {
Self {
command_args: self.command_args.arg(end),
}
}
#[must_use]
pub fn count(self, count: usize) -> Self {
Self {
command_args: self.command_args.arg(count),
}
}
#[must_use]
pub fn consumer<C: Into<CommandArg>>(self, consumer: C) -> Self {
Self {
command_args: self.command_args.arg(consumer),
}
}
}
impl IntoArgs for XPendingOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}
pub struct XPendingResult {
pub num_pending_messages: usize,
pub smallest_id: String,
pub greatest_id: String,
pub consumers: Vec<XPendingConsumer>,
}
impl FromValue for XPendingResult {
fn from_value(value: Value) -> Result<Self> {
let (num_pending_messages, smallest_id, greatest_id, consumers): (
usize,
String,
String,
Vec<XPendingConsumer>,
) = value.into()?;
Ok(Self {
num_pending_messages,
smallest_id,
greatest_id,
consumers,
})
}
}
pub struct XPendingConsumer {
pub consumer: String,
pub num_messages: usize,
}
impl FromValue for XPendingConsumer {
fn from_value(value: Value) -> Result<Self> {
let (consumer, num_messages): (String, usize) = value.into()?;
Ok(Self {
consumer,
num_messages,
})
}
}
pub struct XPendingMessageResult {
pub message_id: String,
pub consumer: String,
pub elapsed_millis: u64,
pub times_delivered: usize,
}
impl FromValue for XPendingMessageResult {
fn from_value(value: Value) -> Result<Self> {
let (message_id, consumer, elapsed_millis, times_delivered): (String, String, u64, usize) =
value.into()?;
Ok(Self {
message_id,
consumer,
elapsed_millis,
times_delivered,
})
}
}