[][src]Trait redis_streams::StreamCommands

pub trait StreamCommands: ConnectionLike + Sized {
    fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        group: G,
        ids: &[ID]
    ) -> RedisResult<RV> { ... }
fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        id: ID,
        items: &[(F, V)]
    ) -> RedisResult<RV> { ... }
fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        id: ID,
        map: BTM
    ) -> RedisResult<RV> { ... }
fn xadd_maxlen<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        maxlen: StreamMaxlen,
        id: ID,
        items: &[(F, V)]
    ) -> RedisResult<RV> { ... }
fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        maxlen: StreamMaxlen,
        id: ID,
        map: BTM
    ) -> RedisResult<RV> { ... }
fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>(
        &mut self,
        key: K,
        group: G,
        consumer: C,
        min_idle_time: MIT,
        ids: &[ID]
    ) -> RedisResult<StreamClaimReply> { ... }
fn xclaim_options<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        group: G,
        consumer: C,
        min_idle_time: MIT,
        ids: &[ID],
        options: StreamClaimOptions
    ) -> RedisResult<RV> { ... }
fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        ids: &[ID]
    ) -> RedisResult<RV> { ... }
fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        group: G,
        id: ID
    ) -> RedisResult<RV> { ... }
fn xgroup_create_mkstream<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        group: G,
        id: ID
    ) -> RedisResult<RV> { ... }
fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        group: G,
        id: ID
    ) -> RedisResult<RV> { ... }
fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        group: G
    ) -> RedisResult<RV> { ... }
fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        group: G,
        consumer: C
    ) -> RedisResult<RV> { ... }
fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>(
        &mut self,
        key: K,
        group: G
    ) -> RedisResult<StreamInfoConsumersReply> { ... }
fn xinfo_groups<K: ToRedisArgs>(
        &mut self,
        key: K
    ) -> RedisResult<StreamInfoGroupsReply> { ... }
fn xinfo_stream<K: ToRedisArgs>(
        &mut self,
        key: K
    ) -> RedisResult<StreamInfoStreamReply> { ... }
fn xlen<K: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K
    ) -> RedisResult<RV> { ... }
fn xpending<K: ToRedisArgs, G: ToRedisArgs>(
        &mut self,
        key: K,
        group: G
    ) -> RedisResult<StreamPendingReply> { ... }
fn xpending_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
        &mut self,
        key: K,
        group: G,
        start: S,
        end: E,
        count: C
    ) -> RedisResult<StreamPendingCountReply> { ... }
fn xpending_consumer_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs, CN: ToRedisArgs>(
        &mut self,
        key: K,
        group: G,
        start: S,
        end: E,
        count: C,
        consumer: CN
    ) -> RedisResult<StreamPendingCountReply> { ... }
fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>(
        &mut self,
        key: K,
        start: S,
        end: E
    ) -> RedisResult<StreamRangeReply> { ... }
fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K
    ) -> RedisResult<RV> { ... }
fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
        &mut self,
        key: K,
        start: S,
        end: E,
        count: C
    ) -> RedisResult<StreamRangeReply> { ... }
fn xread<K: ToRedisArgs, ID: ToRedisArgs>(
        &mut self,
        keys: &[K],
        ids: &[ID]
    ) -> RedisResult<StreamReadReply> { ... }
fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>(
        &mut self,
        keys: &[K],
        ids: &[ID],
        options: StreamReadOptions
    ) -> RedisResult<StreamReadReply> { ... }
fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>(
        &mut self,
        key: K,
        end: E,
        start: S
    ) -> RedisResult<StreamRangeReply> { ... }
fn xrevrange_all<K: ToRedisArgs>(
        &mut self,
        key: K
    ) -> RedisResult<StreamRangeReply> { ... }
fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>(
        &mut self,
        key: K,
        end: E,
        start: S,
        count: C
    ) -> RedisResult<StreamRangeReply> { ... }
fn xtrim<K: ToRedisArgs, RV: FromRedisValue>(
        &mut self,
        key: K,
        maxlen: StreamMaxlen
    ) -> RedisResult<RV> { ... } }

Implementation of all redis stream commands.

Provided methods

fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    group: G,
    ids: &[ID]
) -> RedisResult<RV>

Ack pending stream messages checked out by a consumer.

fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    id: ID,
    items: &[(F, V)]
) -> RedisResult<RV>

Add a stream message by key. Use * as the id for the current timestamp.

fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    id: ID,
    map: BTM
) -> RedisResult<RV>

BTreeMap variant for adding a stream message by key. Use * as the id for the current timestamp.

fn xadd_maxlen<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    maxlen: StreamMaxlen,
    id: ID,
    items: &[(F, V)]
) -> RedisResult<RV>

Add a stream message while capping the stream at a maxlength.

fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    maxlen: StreamMaxlen,
    id: ID,
    map: BTM
) -> RedisResult<RV>

BTreeMap variant for adding a stream message while capping the stream at a maxlength.

fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>(
    &mut self,
    key: K,
    group: G,
    consumer: C,
    min_idle_time: MIT,
    ids: &[ID]
) -> RedisResult<StreamClaimReply>

Claim pending, unacked messages, after some period of time, currently checked out by another consumer.

This method only accepts the must-have arguments for claiming messages. If optional arguments are required, see xclaim_options below.

fn xclaim_options<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    group: G,
    consumer: C,
    min_idle_time: MIT,
    ids: &[ID],
    options: StreamClaimOptions
) -> RedisResult<RV>

This is the optional arguments version for claiming unacked, pending messages currently checked out by another consumer.

use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamClaimOptions,StreamClaimReply};
let client = client_open("redis://127.0.0.1/0").unwrap();
let mut con = client.get_connection().unwrap();

// Claim all pending messages for key "k1",
// from group "g1", checked out by consumer "c1"
// for 10ms with RETRYCOUNT 2 and FORCE

let opts = StreamClaimOptions::default()
    .with_force()
    .retry(2);
let results: RedisResult<StreamClaimReply> =
    con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);

// All optional arguments return a `Result<StreamClaimReply>` with one exception:
// Passing JUSTID returns only the message `id` and omits the HashMap for each message.

let opts = StreamClaimOptions::default()
    .with_justid();
let results: RedisResult<Vec<String>> =
    con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);

fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    ids: &[ID]
) -> RedisResult<RV>

Deletes a list of ids for a given stream key.

fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    group: G,
    id: ID
) -> RedisResult<RV>

This command is used for creating a consumer group. It expects the stream key to already exist. Otherwise, use xgroup_create_mkstream if it doesn't. The id is the starting message id all consumers should read from. Use $ If you want all consumers to read from the last message added to stream.

fn xgroup_create_mkstream<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    group: G,
    id: ID
) -> RedisResult<RV>

This is the alternate version for creating a consumer group which makes the stream if it doesn't exist.

fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    group: G,
    id: ID
) -> RedisResult<RV>

Alter which id you want consumers to begin reading from an existing consumer group.

fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    group: G
) -> RedisResult<RV>

Destroy an existing consumer group for a given stream key

fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    group: G,
    consumer: C
) -> RedisResult<RV>

This deletes a consumer from an existing consumer group for given stream `key.

fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>(
    &mut self,
    key: K,
    group: G
) -> RedisResult<StreamInfoConsumersReply>

This returns all info details about which consumers have read messages for given consumer group. Take note of the StreamInfoConsumersReply return type.

It's possible this return value might not contain new fields added by Redis in future versions.

fn xinfo_groups<K: ToRedisArgs>(
    &mut self,
    key: K
) -> RedisResult<StreamInfoGroupsReply>

Returns all consumer groups created for a given stream key. Take note of the StreamInfoGroupsReply return type.

It's possible this return value might not contain new fields added by Redis in future versions.

fn xinfo_stream<K: ToRedisArgs>(
    &mut self,
    key: K
) -> RedisResult<StreamInfoStreamReply>

Returns info about high-level stream details (first & last message id, length, number of groups, etc.) Take note of the StreamInfoStreamReply return type.

It's possible this return value might not contain new fields added by Redis in future versions.

fn xlen<K: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K
) -> RedisResult<RV>

Returns the number of messages for a given stream key.

fn xpending<K: ToRedisArgs, G: ToRedisArgs>(
    &mut self,
    key: K,
    group: G
) -> RedisResult<StreamPendingReply>

This is a basic version of making XPENDING command calls which only passes a stream key and consumer group and it returns details about which consumers have pending messages that haven't been acked.

You can use this method along with xclaim or xclaim_options for determining which messages need to be retried.

Take note of the StreamPendingReply return type.

fn xpending_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
    &mut self,
    key: K,
    group: G,
    start: S,
    end: E,
    count: C
) -> RedisResult<StreamPendingCountReply>

This XPENDING version returns a list of all messages over the range. You can use this for paginating pending messages (but without the message HashMap).

Start and end follow the same rules xrange args. Set start to - and end to + for the entire stream.

Take note of the StreamPendingCountReply return type.

fn xpending_consumer_count<K: ToRedisArgs, G: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs, CN: ToRedisArgs>(
    &mut self,
    key: K,
    group: G,
    start: S,
    end: E,
    count: C,
    consumer: CN
) -> RedisResult<StreamPendingCountReply>

An alternate version of xpending_count which filters by consumer name.

Start and end follow the same rules xrange args. Set start to - and end to + for the entire stream.

Take note of the StreamPendingCountReply return type.

fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>(
    &mut self,
    key: K,
    start: S,
    end: E
) -> RedisResult<StreamRangeReply>

Returns a range of messages in a given stream key.

Set start to - to begin at the first message. Set end to + to end the most recent message. You can pass message id to both start and end.

Take note of the StreamRangeReply return type.

fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K
) -> RedisResult<RV>

A helper method for automatically returning all messages in a stream by key. Use with caution!

fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
    &mut self,
    key: K,
    start: S,
    end: E,
    count: C
) -> RedisResult<StreamRangeReply>

A method for paginating a stream by key.

fn xread<K: ToRedisArgs, ID: ToRedisArgs>(
    &mut self,
    keys: &[K],
    ids: &[ID]
) -> RedisResult<StreamReadReply>

Read a list of ids for each stream key. This is the basic form of reading streams. For more advanced control, like blocking, limiting, or reading by consumer group, see xread_options.

fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>(
    &mut self,
    keys: &[K],
    ids: &[ID],
    options: StreamReadOptions
) -> RedisResult<StreamReadReply>

This method handles setting optional arguments for XREAD or XREADGROUP Redis commands.

use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamReadOptions,StreamReadReply};
let client = client_open("redis://127.0.0.1/0").unwrap();
let mut con = client.get_connection().unwrap();

// Read 10 messages from the start of the stream,
// without registering as a consumer group.

let opts = StreamReadOptions::default()
    .count(10);
let results: RedisResult<StreamReadReply> =
    con.xread_options(&["k1"], &["0"], opts);

// Read all undelivered messages for a given
// consumer group. Be advised: the consumer group must already
// exist before making this call. Also note: we're passing
// '>' as the id here, which means all undelivered messages.

let opts = StreamReadOptions::default()
    .group("group-1", "consumer-1");
let results: RedisResult<StreamReadReply> =
    con.xread_options(&["k1"], &[">"], opts);

fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>(
    &mut self,
    key: K,
    end: E,
    start: S
) -> RedisResult<StreamRangeReply>

This is the reverse version of xrange. The same rules apply for start and end here.

fn xrevrange_all<K: ToRedisArgs>(
    &mut self,
    key: K
) -> RedisResult<StreamRangeReply>

This is the reverse version of xrange_all. The same rules apply for start and end here.

fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>(
    &mut self,
    key: K,
    end: E,
    start: S,
    count: C
) -> RedisResult<StreamRangeReply>

This is the reverse version of xrange_count. The same rules apply for start and end here.

fn xtrim<K: ToRedisArgs, RV: FromRedisValue>(
    &mut self,
    key: K,
    maxlen: StreamMaxlen
) -> RedisResult<RV>

Trim a stream key to a MAXLEN count.

Loading content...

Implementors

impl<T> StreamCommands for T where
    T: ConnectionLike
[src]

Loading content...