Trait StreamCommands

Source
pub trait StreamCommands: ConnectionLike + Sized {
Show 29 methods // Provided methods fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, ids: &[ID], ) -> RedisResult<RV> { ... } fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, id: ID, items: &[(F, V)], ) -> RedisResult<RV> { ... } fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, id: ID, map: BTM, ) -> RedisResult<RV> { ... } fn xadd_maxlen<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, id: ID, items: &[(F, V)], ) -> RedisResult<RV> { ... } fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, id: ID, map: BTM, ) -> RedisResult<RV> { ... } fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>( &mut self, key: K, group: G, consumer: C, min_idle_time: MIT, ids: &[ID], ) -> RedisResult<StreamClaimReply> { ... } fn xclaim_options<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, consumer: C, min_idle_time: MIT, ids: &[ID], options: StreamClaimOptions, ) -> RedisResult<RV> { ... } fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ids: &[ID], ) -> RedisResult<RV> { ... } fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV> { ... } fn xgroup_create_mkstream<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV> { ... } fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV> { ... } fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, ) -> RedisResult<RV> { ... } fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, consumer: C, ) -> RedisResult<RV> { ... } fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>( &mut self, key: K, group: G, ) -> RedisResult<StreamInfoConsumersReply> { ... } fn xinfo_groups<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamInfoGroupsReply> { ... } fn xinfo_stream<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamInfoStreamReply> { ... } fn xlen<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ) -> RedisResult<RV> { ... } fn xpending<K: ToRedisArgs, G: ToRedisArgs>( &mut self, key: K, group: G, ) -> RedisResult<StreamPendingReply> { ... } fn xpending_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, group: G, start: S, end: E, count: C, ) -> RedisResult<StreamPendingCountReply> { ... } fn xpending_consumer_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs, CN: ToRedisArgs>( &mut self, key: K, group: G, start: S, end: E, count: C, consumer: CN, ) -> RedisResult<StreamPendingCountReply> { ... } fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>( &mut self, key: K, start: S, end: E, ) -> RedisResult<StreamRangeReply> { ... } fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ) -> RedisResult<RV> { ... } fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, start: S, end: E, count: C, ) -> RedisResult<StreamRangeReply> { ... } fn xread<K: ToRedisArgs, ID: ToRedisArgs>( &mut self, keys: &[K], ids: &[ID], ) -> RedisResult<StreamReadReply> { ... } fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>( &mut self, keys: &[K], ids: &[ID], options: StreamReadOptions, ) -> RedisResult<StreamReadReply> { ... } fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>( &mut self, key: K, end: E, start: S, ) -> RedisResult<StreamRangeReply> { ... } fn xrevrange_all<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamRangeReply> { ... } fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, end: E, start: S, count: C, ) -> RedisResult<StreamRangeReply> { ... } fn xtrim<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, ) -> RedisResult<RV> { ... }
}
Expand description

Implementation of all redis stream commands.

Provided Methods§

Source

fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, ids: &[ID], ) -> RedisResult<RV>

Ack pending stream messages checked out by a consumer.

Source

fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, id: ID, items: &[(F, V)], ) -> RedisResult<RV>

Add a stream message by key. Use * as the id for the current timestamp.

Source

fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, id: ID, map: BTM, ) -> RedisResult<RV>

BTreeMap variant for adding a stream message by key. Use * as the id for the current timestamp.

Source

fn xadd_maxlen<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, id: ID, items: &[(F, V)], ) -> RedisResult<RV>

Add a stream message while capping the stream at a maxlength.

Source

fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, id: ID, map: BTM, ) -> RedisResult<RV>

BTreeMap variant for adding a stream message while capping the stream at a maxlength.

Source

fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>( &mut self, key: K, group: G, consumer: C, min_idle_time: MIT, ids: &[ID], ) -> RedisResult<StreamClaimReply>

Claim pending, unacked messages, after some period of time, currently checked out by another consumer.

This method only accepts the must-have arguments for claiming messages. If optional arguments are required, see xclaim_options below.

Source

fn xclaim_options<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, consumer: C, min_idle_time: MIT, ids: &[ID], options: StreamClaimOptions, ) -> RedisResult<RV>

This is the optional arguments version for claiming unacked, pending messages currently checked out by another consumer.

use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamClaimOptions,StreamClaimReply};
let client = client_open("redis://127.0.0.1/0").unwrap();
let mut con = client.get_connection().unwrap();

// Claim all pending messages for key "k1",
// from group "g1", checked out by consumer "c1"
// for 10ms with RETRYCOUNT 2 and FORCE

let opts = StreamClaimOptions::default()
    .with_force()
    .retry(2);
let results: RedisResult<StreamClaimReply> =
    con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);

// All optional arguments return a `Result<StreamClaimReply>` with one exception:
// Passing JUSTID returns only the message `id` and omits the HashMap for each message.

let opts = StreamClaimOptions::default()
    .with_justid();
let results: RedisResult<Vec<String>> =
    con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);
Source

fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ids: &[ID], ) -> RedisResult<RV>

Deletes a list of ids for a given stream key.

Source

fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV>

This command is used for creating a consumer group. It expects the stream key to already exist. Otherwise, use xgroup_create_mkstream if it doesn’t. The id is the starting message id all consumers should read from. Use $ If you want all consumers to read from the last message added to stream.

Source

fn xgroup_create_mkstream<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV>

This is the alternate version for creating a consumer group which makes the stream if it doesn’t exist.

Source

fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, id: ID, ) -> RedisResult<RV>

Alter which id you want consumers to begin reading from an existing consumer group.

Source

fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, ) -> RedisResult<RV>

Destroy an existing consumer group for a given stream key

Source

fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, group: G, consumer: C, ) -> RedisResult<RV>

This deletes a consumer from an existing consumer group for given stream `key.

Source

fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>( &mut self, key: K, group: G, ) -> RedisResult<StreamInfoConsumersReply>

This returns all info details about which consumers have read messages for given consumer group. Take note of the StreamInfoConsumersReply return type.

It’s possible this return value might not contain new fields added by Redis in future versions.

Source

fn xinfo_groups<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamInfoGroupsReply>

Returns all consumer groups created for a given stream key. Take note of the StreamInfoGroupsReply return type.

It’s possible this return value might not contain new fields added by Redis in future versions.

Source

fn xinfo_stream<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamInfoStreamReply>

Returns info about high-level stream details (first & last message id, length, number of groups, etc.) Take note of the StreamInfoStreamReply return type.

It’s possible this return value might not contain new fields added by Redis in future versions.

Source

fn xlen<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ) -> RedisResult<RV>

Returns the number of messages for a given stream key.

Source

fn xpending<K: ToRedisArgs, G: ToRedisArgs>( &mut self, key: K, group: G, ) -> RedisResult<StreamPendingReply>

This is a basic version of making XPENDING command calls which only passes a stream key and consumer group and it returns details about which consumers have pending messages that haven’t been acked.

You can use this method along with xclaim or xclaim_options for determining which messages need to be retried.

Take note of the StreamPendingReply return type.

Source

fn xpending_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, group: G, start: S, end: E, count: C, ) -> RedisResult<StreamPendingCountReply>

This XPENDING version returns a list of all messages over the range. You can use this for paginating pending messages (but without the message HashMap).

Start and end follow the same rules xrange args. Set start to - and end to + for the entire stream.

Take note of the StreamPendingCountReply return type.

Source

fn xpending_consumer_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs, CN: ToRedisArgs>( &mut self, key: K, group: G, start: S, end: E, count: C, consumer: CN, ) -> RedisResult<StreamPendingCountReply>

An alternate version of xpending_count which filters by consumer name.

Start and end follow the same rules xrange args. Set start to - and end to + for the entire stream.

Take note of the StreamPendingCountReply return type.

Source

fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>( &mut self, key: K, start: S, end: E, ) -> RedisResult<StreamRangeReply>

Returns a range of messages in a given stream key.

Set start to - to begin at the first message. Set end to + to end the most recent message. You can pass message id to both start and end.

Take note of the StreamRangeReply return type.

Source

fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, ) -> RedisResult<RV>

A helper method for automatically returning all messages in a stream by key. Use with caution!

Source

fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, start: S, end: E, count: C, ) -> RedisResult<StreamRangeReply>

A method for paginating a stream by key.

Source

fn xread<K: ToRedisArgs, ID: ToRedisArgs>( &mut self, keys: &[K], ids: &[ID], ) -> RedisResult<StreamReadReply>

Read a list of ids for each stream key. This is the basic form of reading streams. For more advanced control, like blocking, limiting, or reading by consumer group, see xread_options.

Source

fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>( &mut self, keys: &[K], ids: &[ID], options: StreamReadOptions, ) -> RedisResult<StreamReadReply>

This method handles setting optional arguments for XREAD or XREADGROUP Redis commands.

use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamReadOptions,StreamReadReply};
let client = client_open("redis://127.0.0.1/0").unwrap();
let mut con = client.get_connection().unwrap();

// Read 10 messages from the start of the stream,
// without registering as a consumer group.

let opts = StreamReadOptions::default()
    .count(10);
let results: RedisResult<StreamReadReply> =
    con.xread_options(&["k1"], &["0"], opts);

// Read all undelivered messages for a given
// consumer group. Be advised: the consumer group must already
// exist before making this call. Also note: we're passing
// '>' as the id here, which means all undelivered messages.

let opts = StreamReadOptions::default()
    .group("group-1", "consumer-1");
let results: RedisResult<StreamReadReply> =
    con.xread_options(&["k1"], &[">"], opts);
Source

fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>( &mut self, key: K, end: E, start: S, ) -> RedisResult<StreamRangeReply>

This is the reverse version of xrange. The same rules apply for start and end here.

Source

fn xrevrange_all<K: ToRedisArgs>( &mut self, key: K, ) -> RedisResult<StreamRangeReply>

This is the reverse version of xrange_all. The same rules apply for start and end here.

Source

fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>( &mut self, key: K, end: E, start: S, count: C, ) -> RedisResult<StreamRangeReply>

This is the reverse version of xrange_count. The same rules apply for start and end here.

Source

fn xtrim<K: ToRedisArgs, RV: FromRedisValue>( &mut self, key: K, maxlen: StreamMaxlen, ) -> RedisResult<RV>

Trim a stream key to a MAXLEN count.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<T> StreamCommands for T
where T: ConnectionLike,