pub trait StreamCommands: ConnectionLike + Sized {
Show 29 methods
// Provided methods
fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
ids: &[ID],
) -> RedisResult<RV> { ... }
fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
id: ID,
items: &[(F, V)],
) -> RedisResult<RV> { ... }
fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
id: ID,
map: BTM,
) -> RedisResult<RV> { ... }
fn xadd_maxlen<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
maxlen: StreamMaxlen,
id: ID,
items: &[(F, V)],
) -> RedisResult<RV> { ... }
fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
maxlen: StreamMaxlen,
id: ID,
map: BTM,
) -> RedisResult<RV> { ... }
fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>(
&mut self,
key: K,
group: G,
consumer: C,
min_idle_time: MIT,
ids: &[ID],
) -> RedisResult<StreamClaimReply> { ... }
fn xclaim_options<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
consumer: C,
min_idle_time: MIT,
ids: &[ID],
options: StreamClaimOptions,
) -> RedisResult<RV> { ... }
fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
ids: &[ID],
) -> RedisResult<RV> { ... }
fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
id: ID,
) -> RedisResult<RV> { ... }
fn xgroup_create_mkstream<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
id: ID,
) -> RedisResult<RV> { ... }
fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
id: ID,
) -> RedisResult<RV> { ... }
fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
) -> RedisResult<RV> { ... }
fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
consumer: C,
) -> RedisResult<RV> { ... }
fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>(
&mut self,
key: K,
group: G,
) -> RedisResult<StreamInfoConsumersReply> { ... }
fn xinfo_groups<K: ToRedisArgs>(
&mut self,
key: K,
) -> RedisResult<StreamInfoGroupsReply> { ... }
fn xinfo_stream<K: ToRedisArgs>(
&mut self,
key: K,
) -> RedisResult<StreamInfoStreamReply> { ... }
fn xlen<K: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
) -> RedisResult<RV> { ... }
fn xpending<K: ToRedisArgs, G: ToRedisArgs>(
&mut self,
key: K,
group: G,
) -> RedisResult<StreamPendingReply> { ... }
fn xpending_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
&mut self,
key: K,
group: G,
start: S,
end: E,
count: C,
) -> RedisResult<StreamPendingCountReply> { ... }
fn xpending_consumer_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs, CN: ToRedisArgs>(
&mut self,
key: K,
group: G,
start: S,
end: E,
count: C,
consumer: CN,
) -> RedisResult<StreamPendingCountReply> { ... }
fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>(
&mut self,
key: K,
start: S,
end: E,
) -> RedisResult<StreamRangeReply> { ... }
fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
) -> RedisResult<RV> { ... }
fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
&mut self,
key: K,
start: S,
end: E,
count: C,
) -> RedisResult<StreamRangeReply> { ... }
fn xread<K: ToRedisArgs, ID: ToRedisArgs>(
&mut self,
keys: &[K],
ids: &[ID],
) -> RedisResult<StreamReadReply> { ... }
fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>(
&mut self,
keys: &[K],
ids: &[ID],
options: StreamReadOptions,
) -> RedisResult<StreamReadReply> { ... }
fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>(
&mut self,
key: K,
end: E,
start: S,
) -> RedisResult<StreamRangeReply> { ... }
fn xrevrange_all<K: ToRedisArgs>(
&mut self,
key: K,
) -> RedisResult<StreamRangeReply> { ... }
fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>(
&mut self,
key: K,
end: E,
start: S,
count: C,
) -> RedisResult<StreamRangeReply> { ... }
fn xtrim<K: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
maxlen: StreamMaxlen,
) -> RedisResult<RV> { ... }
}
Expand description
Implementation of all redis stream commands.
Provided Methods§
Sourcefn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
ids: &[ID],
) -> RedisResult<RV>
fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, ids: &[ID], ) -> RedisResult<RV>
Ack pending stream messages checked out by a consumer.
Sourcefn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
id: ID,
items: &[(F, V)],
) -> RedisResult<RV>
fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, id: ID, items: &[(F, V)], ) -> RedisResult<RV>
Add a stream message by key
. Use *
as the id
for the current timestamp.
Sourcefn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
id: ID,
map: BTM,
) -> RedisResult<RV>
fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, id: ID, map: BTM, ) -> RedisResult<RV>
BTreeMap variant for adding a stream message by key
.
Use *
as the id
for the current timestamp.
Sourcefn xadd_maxlen<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
maxlen: StreamMaxlen,
id: ID,
items: &[(F, V)],
) -> RedisResult<RV>
fn xadd_maxlen<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, id: ID, items: &[(F, V)], ) -> RedisResult<RV>
Add a stream message while capping the stream at a maxlength.
Sourcefn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
maxlen: StreamMaxlen,
id: ID,
map: BTM,
) -> RedisResult<RV>
fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, id: ID, map: BTM, ) -> RedisResult<RV>
BTreeMap variant for adding a stream message while capping the stream at a maxlength.
Sourcefn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>(
&mut self,
key: K,
group: G,
consumer: C,
min_idle_time: MIT,
ids: &[ID],
) -> RedisResult<StreamClaimReply>
fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>( &mut self, key: K, group: G, consumer: C, min_idle_time: MIT, ids: &[ID], ) -> RedisResult<StreamClaimReply>
Claim pending, unacked messages, after some period of time, currently checked out by another consumer.
This method only accepts the must-have arguments for claiming messages.
If optional arguments are required, see xclaim_options
below.
Sourcefn xclaim_options<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
consumer: C,
min_idle_time: MIT,
ids: &[ID],
options: StreamClaimOptions,
) -> RedisResult<RV>
fn xclaim_options<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, consumer: C, min_idle_time: MIT, ids: &[ID], options: StreamClaimOptions, ) -> RedisResult<RV>
This is the optional arguments version for claiming unacked, pending messages currently checked out by another consumer.
use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamClaimOptions,StreamClaimReply};
let client = client_open("redis://127.0.0.1/0").unwrap();
let mut con = client.get_connection().unwrap();
// Claim all pending messages for key "k1",
// from group "g1", checked out by consumer "c1"
// for 10ms with RETRYCOUNT 2 and FORCE
let opts = StreamClaimOptions::default()
.with_force()
.retry(2);
let results: RedisResult<StreamClaimReply> =
con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);
// All optional arguments return a `Result<StreamClaimReply>` with one exception:
// Passing JUSTID returns only the message `id` and omits the HashMap for each message.
let opts = StreamClaimOptions::default()
.with_justid();
let results: RedisResult<Vec<String>> =
con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);
Sourcefn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
ids: &[ID],
) -> RedisResult<RV>
fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ids: &[ID], ) -> RedisResult<RV>
Deletes a list of id
s for a given stream key
.
Sourcefn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
id: ID,
) -> RedisResult<RV>
fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV>
This command is used for creating a consumer group
. It expects the stream key
to already exist. Otherwise, use xgroup_create_mkstream
if it doesn’t.
The id
is the starting message id all consumers should read from. Use $
If you want
all consumers to read from the last message added to stream.
Sourcefn xgroup_create_mkstream<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
id: ID,
) -> RedisResult<RV>
fn xgroup_create_mkstream<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV>
This is the alternate version for creating a consumer group
which makes the stream if it doesn’t exist.
Sourcefn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
id: ID,
) -> RedisResult<RV>
fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV>
Alter which id
you want consumers to begin reading from an existing
consumer group
.
Sourcefn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
) -> RedisResult<RV>
fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, ) -> RedisResult<RV>
Destroy an existing consumer group
for a given stream key
Sourcefn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
group: G,
consumer: C,
) -> RedisResult<RV>
fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, consumer: C, ) -> RedisResult<RV>
This deletes a consumer
from an existing consumer group
for given stream `key.
Sourcefn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>(
&mut self,
key: K,
group: G,
) -> RedisResult<StreamInfoConsumersReply>
fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>( &mut self, key: K, group: G, ) -> RedisResult<StreamInfoConsumersReply>
This returns all info details about
which consumers have read messages for given consumer group
.
Take note of the StreamInfoConsumersReply return type.
It’s possible this return value might not contain new fields added by Redis in future versions.
Sourcefn xinfo_groups<K: ToRedisArgs>(
&mut self,
key: K,
) -> RedisResult<StreamInfoGroupsReply>
fn xinfo_groups<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamInfoGroupsReply>
Returns all consumer group
s created for a given stream key
.
Take note of the StreamInfoGroupsReply return type.
It’s possible this return value might not contain new fields added by Redis in future versions.
Sourcefn xinfo_stream<K: ToRedisArgs>(
&mut self,
key: K,
) -> RedisResult<StreamInfoStreamReply>
fn xinfo_stream<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamInfoStreamReply>
Returns info about high-level stream details
(first & last message id
, length, number of groups, etc.)
Take note of the StreamInfoStreamReply return type.
It’s possible this return value might not contain new fields added by Redis in future versions.
Sourcefn xlen<K: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
) -> RedisResult<RV>
fn xlen<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ) -> RedisResult<RV>
Returns the number of messages for a given stream key
.
Sourcefn xpending<K: ToRedisArgs, G: ToRedisArgs>(
&mut self,
key: K,
group: G,
) -> RedisResult<StreamPendingReply>
fn xpending<K: ToRedisArgs, G: ToRedisArgs>( &mut self, key: K, group: G, ) -> RedisResult<StreamPendingReply>
This is a basic version of making XPENDING command calls which only
passes a stream key
and consumer group
and it
returns details about which consumers have pending messages
that haven’t been acked.
You can use this method along with
xclaim
or xclaim_options
for determining which messages
need to be retried.
Take note of the StreamPendingReply return type.
Sourcefn xpending_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
&mut self,
key: K,
group: G,
start: S,
end: E,
count: C,
) -> RedisResult<StreamPendingCountReply>
fn xpending_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, group: G, start: S, end: E, count: C, ) -> RedisResult<StreamPendingCountReply>
This XPENDING version returns a list of all messages over the range. You can use this for paginating pending messages (but without the message HashMap).
Start and end follow the same rules xrange
args. Set start to -
and end to +
for the entire stream.
Take note of the StreamPendingCountReply return type.
Sourcefn xpending_consumer_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs, CN: ToRedisArgs>(
&mut self,
key: K,
group: G,
start: S,
end: E,
count: C,
consumer: CN,
) -> RedisResult<StreamPendingCountReply>
fn xpending_consumer_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs, CN: ToRedisArgs>( &mut self, key: K, group: G, start: S, end: E, count: C, consumer: CN, ) -> RedisResult<StreamPendingCountReply>
An alternate version of xpending_count
which filters by consumer
name.
Start and end follow the same rules xrange
args. Set start to -
and end to +
for the entire stream.
Take note of the StreamPendingCountReply return type.
Sourcefn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>(
&mut self,
key: K,
start: S,
end: E,
) -> RedisResult<StreamRangeReply>
fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>( &mut self, key: K, start: S, end: E, ) -> RedisResult<StreamRangeReply>
Returns a range of messages in a given stream key
.
Set start
to -
to begin at the first message.
Set end
to +
to end the most recent message.
You can pass message id
to both start
and end
.
Take note of the StreamRangeReply return type.
Sourcefn xrange_all<K: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
) -> RedisResult<RV>
fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ) -> RedisResult<RV>
A helper method for automatically returning all messages in a stream by key
.
Use with caution!
Sourcefn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
&mut self,
key: K,
start: S,
end: E,
count: C,
) -> RedisResult<StreamRangeReply>
fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, start: S, end: E, count: C, ) -> RedisResult<StreamRangeReply>
A method for paginating a stream by key
.
Sourcefn xread<K: ToRedisArgs, ID: ToRedisArgs>(
&mut self,
keys: &[K],
ids: &[ID],
) -> RedisResult<StreamReadReply>
fn xread<K: ToRedisArgs, ID: ToRedisArgs>( &mut self, keys: &[K], ids: &[ID], ) -> RedisResult<StreamReadReply>
Read a list of id
s for each stream key
.
This is the basic form of reading streams.
For more advanced control, like blocking, limiting, or reading by consumer group
,
see xread_options
.
Sourcefn xread_options<K: ToRedisArgs, ID: ToRedisArgs>(
&mut self,
keys: &[K],
ids: &[ID],
options: StreamReadOptions,
) -> RedisResult<StreamReadReply>
fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>( &mut self, keys: &[K], ids: &[ID], options: StreamReadOptions, ) -> RedisResult<StreamReadReply>
This method handles setting optional arguments for
XREAD
or XREADGROUP
Redis commands.
use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamReadOptions,StreamReadReply};
let client = client_open("redis://127.0.0.1/0").unwrap();
let mut con = client.get_connection().unwrap();
// Read 10 messages from the start of the stream,
// without registering as a consumer group.
let opts = StreamReadOptions::default()
.count(10);
let results: RedisResult<StreamReadReply> =
con.xread_options(&["k1"], &["0"], opts);
// Read all undelivered messages for a given
// consumer group. Be advised: the consumer group must already
// exist before making this call. Also note: we're passing
// '>' as the id here, which means all undelivered messages.
let opts = StreamReadOptions::default()
.group("group-1", "consumer-1");
let results: RedisResult<StreamReadReply> =
con.xread_options(&["k1"], &[">"], opts);
Sourcefn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>(
&mut self,
key: K,
end: E,
start: S,
) -> RedisResult<StreamRangeReply>
fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>( &mut self, key: K, end: E, start: S, ) -> RedisResult<StreamRangeReply>
This is the reverse version of xrange
.
The same rules apply for start
and end
here.
Sourcefn xrevrange_all<K: ToRedisArgs>(
&mut self,
key: K,
) -> RedisResult<StreamRangeReply>
fn xrevrange_all<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamRangeReply>
This is the reverse version of xrange_all
.
The same rules apply for start
and end
here.
Sourcefn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>(
&mut self,
key: K,
end: E,
start: S,
count: C,
) -> RedisResult<StreamRangeReply>
fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, end: E, start: S, count: C, ) -> RedisResult<StreamRangeReply>
This is the reverse version of xrange_count
.
The same rules apply for start
and end
here.
Sourcefn xtrim<K: ToRedisArgs, RV: FromRedisValue>(
&mut self,
key: K,
maxlen: StreamMaxlen,
) -> RedisResult<RV>
fn xtrim<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, ) -> RedisResult<RV>
Trim a stream key
to a MAXLEN count.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.