redis_streams/
commands.rs

1use crate::types::{
2    StreamClaimOptions, StreamClaimReply, StreamInfoConsumersReply, StreamInfoGroupsReply,
3    StreamInfoStreamReply, StreamMaxlen, StreamPendingCountReply, StreamPendingReply,
4    StreamRangeReply, StreamReadOptions, StreamReadReply,
5};
6
7use redis::{cmd, ConnectionLike, FromRedisValue, RedisResult, ToRedisArgs};
8
9/// Implementation of all redis stream commands.
10///
11pub trait StreamCommands: ConnectionLike + Sized {
12    // XACK <key> <group> <id> <id> ... <id>
13
14    /// Ack pending stream messages checked out by a consumer.
15    ///
16    #[inline]
17    fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
18        &mut self,
19        key: K,
20        group: G,
21        ids: &[ID],
22    ) -> RedisResult<RV> {
23        cmd("XACK").arg(key).arg(group).arg(ids).query(self)
24    }
25
26    // XADD key <ID or *> [field value] [field value] ...
27
28    /// Add a stream message by `key`. Use `*` as the `id` for the current timestamp.
29    ///
30    #[inline]
31    fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
32        &mut self,
33        key: K,
34        id: ID,
35        items: &[(F, V)],
36    ) -> RedisResult<RV> {
37        cmd("XADD").arg(key).arg(id).arg(items).query(self)
38    }
39
40    // XADD key <ID or *> [rust BTreeMap] ...
41
42    /// BTreeMap variant for adding a stream message by `key`.
43    /// Use `*` as the `id` for the current timestamp.
44    ///
45    #[inline]
46    fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
47        &mut self,
48        key: K,
49        id: ID,
50        map: BTM,
51    ) -> RedisResult<RV> {
52        cmd("XADD").arg(key).arg(id).arg(map).query(self)
53    }
54
55    // XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ...
56
57    /// Add a stream message while capping the stream at a maxlength.
58    ///
59    #[inline]
60    fn xadd_maxlen<
61        K: ToRedisArgs,
62        ID: ToRedisArgs,
63        F: ToRedisArgs,
64        V: ToRedisArgs,
65        RV: FromRedisValue,
66    >(
67        &mut self,
68        key: K,
69        maxlen: StreamMaxlen,
70        id: ID,
71        items: &[(F, V)],
72    ) -> RedisResult<RV> {
73        cmd("XADD")
74            .arg(key)
75            .arg(maxlen)
76            .arg(id)
77            .arg(items)
78            .query(self)
79    }
80
81    // XADD key [MAXLEN [~|=] <count>] <ID or *> [rust BTreeMap] ...
82
83    /// BTreeMap variant for adding a stream message while capping the stream at a maxlength.
84    ///
85    #[inline]
86    fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
87        &mut self,
88        key: K,
89        maxlen: StreamMaxlen,
90        id: ID,
91        map: BTM,
92    ) -> RedisResult<RV> {
93        cmd("XADD")
94            .arg(key)
95            .arg(maxlen)
96            .arg(id)
97            .arg(map)
98            .query(self)
99    }
100
101    // XCLAIM <key> <group> <consumer> <min-idle-time> [<ID-1> <ID-2>]
102
103    /// Claim pending, unacked messages, after some period of time,
104    /// currently checked out by another consumer.
105    ///
106    /// This method only accepts the must-have arguments for claiming messages.
107    /// If optional arguments are required, see `xclaim_options` below.
108    ///
109    #[inline]
110    fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>(
111        &mut self,
112        key: K,
113        group: G,
114        consumer: C,
115        min_idle_time: MIT,
116        ids: &[ID],
117    ) -> RedisResult<StreamClaimReply> {
118        cmd("XCLAIM")
119            .arg(key)
120            .arg(group)
121            .arg(consumer)
122            .arg(min_idle_time)
123            .arg(ids)
124            .query(self)
125    }
126
127    // XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
128    //     [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
129    //     [FORCE] [JUSTID]
130
131    /// This is the optional arguments version for claiming unacked, pending messages
132    /// currently checked out by another consumer.
133    ///
134    /// ```no_run
135    /// use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamClaimOptions,StreamClaimReply};
136    /// let client = client_open("redis://127.0.0.1/0").unwrap();
137    /// let mut con = client.get_connection().unwrap();
138    ///
139    /// // Claim all pending messages for key "k1",
140    /// // from group "g1", checked out by consumer "c1"
141    /// // for 10ms with RETRYCOUNT 2 and FORCE
142    ///
143    /// let opts = StreamClaimOptions::default()
144    ///     .with_force()
145    ///     .retry(2);
146    /// let results: RedisResult<StreamClaimReply> =
147    ///     con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);
148    ///
149    /// // All optional arguments return a `Result<StreamClaimReply>` with one exception:
150    /// // Passing JUSTID returns only the message `id` and omits the HashMap for each message.
151    ///
152    /// let opts = StreamClaimOptions::default()
153    ///     .with_justid();
154    /// let results: RedisResult<Vec<String>> =
155    ///     con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);
156    /// ```
157    ///
158    #[inline]
159    fn xclaim_options<
160        K: ToRedisArgs,
161        G: ToRedisArgs,
162        C: ToRedisArgs,
163        MIT: ToRedisArgs,
164        ID: ToRedisArgs,
165        RV: FromRedisValue,
166    >(
167        &mut self,
168        key: K,
169        group: G,
170        consumer: C,
171        min_idle_time: MIT,
172        ids: &[ID],
173        options: StreamClaimOptions,
174    ) -> RedisResult<RV> {
175        cmd("XCLAIM")
176            .arg(key)
177            .arg(group)
178            .arg(consumer)
179            .arg(min_idle_time)
180            .arg(ids)
181            .arg(options)
182            .query(self)
183    }
184
185    // XDEL <key> [<ID1> <ID2> ... <IDN>]
186
187    /// Deletes a list of `id`s for a given stream `key`.
188    ///
189    #[inline]
190    fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
191        &mut self,
192        key: K,
193        ids: &[ID],
194    ) -> RedisResult<RV> {
195        cmd("XDEL").arg(key).arg(ids).query(self)
196    }
197
198    // XGROUP CREATE <key> <groupname> <id or $>
199
200    /// This command is used for creating a consumer `group`. It expects the stream key
201    /// to already exist. Otherwise, use `xgroup_create_mkstream` if it doesn't.
202    /// The `id` is the starting message id all consumers should read from. Use `$` If you want
203    /// all consumers to read from the last message added to stream.
204    ///
205    #[inline]
206    fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
207        &mut self,
208        key: K,
209        group: G,
210        id: ID,
211    ) -> RedisResult<RV> {
212        cmd("XGROUP")
213            .arg("CREATE")
214            .arg(key)
215            .arg(group)
216            .arg(id)
217            .query(self)
218    }
219
220    // XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
221
222    /// This is the alternate version for creating a consumer `group`
223    /// which makes the stream if it doesn't exist.
224    ///
225    #[inline]
226    fn xgroup_create_mkstream<
227        K: ToRedisArgs,
228        G: ToRedisArgs,
229        ID: ToRedisArgs,
230        RV: FromRedisValue,
231    >(
232        &mut self,
233        key: K,
234        group: G,
235        id: ID,
236    ) -> RedisResult<RV> {
237        cmd("XGROUP")
238            .arg("CREATE")
239            .arg(key)
240            .arg(group)
241            .arg(id)
242            .arg("MKSTREAM")
243            .query(self)
244    }
245
246    // XGROUP SETID <key> <groupname> <id or $>
247
248    /// Alter which `id` you want consumers to begin reading from an existing
249    /// consumer `group`.
250    ///
251    #[inline]
252    fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
253        &mut self,
254        key: K,
255        group: G,
256        id: ID,
257    ) -> RedisResult<RV> {
258        cmd("XGROUP")
259            .arg("SETID")
260            .arg(key)
261            .arg(group)
262            .arg(id)
263            .query(self)
264    }
265
266    // XGROUP DESTROY <key> <groupname>
267
268    /// Destroy an existing consumer `group` for a given stream `key`
269    ///
270    #[inline]
271    fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>(
272        &mut self,
273        key: K,
274        group: G,
275    ) -> RedisResult<RV> {
276        cmd("XGROUP").arg("DESTROY").arg(key).arg(group).query(self)
277    }
278
279    // XGROUP DELCONSUMER <key> <groupname> <consumername>
280
281    /// This deletes a `consumer` from an existing consumer `group`
282    /// for given stream `key.
283    ///
284    #[inline]
285    fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>(
286        &mut self,
287        key: K,
288        group: G,
289        consumer: C,
290    ) -> RedisResult<RV> {
291        cmd("XGROUP")
292            .arg("DELCONSUMER")
293            .arg(key)
294            .arg(group)
295            .arg(consumer)
296            .query(self)
297    }
298
299    // XINFO CONSUMERS <key> <group>
300
301    /// This returns all info details about
302    /// which consumers have read messages for given consumer `group`.
303    /// Take note of the StreamInfoConsumersReply return type.
304    ///
305    /// *It's possible this return value might not contain new fields
306    /// added by Redis in future versions.*
307    ///
308    #[inline]
309    fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>(
310        &mut self,
311        key: K,
312        group: G,
313    ) -> RedisResult<StreamInfoConsumersReply> {
314        cmd("XINFO")
315            .arg("CONSUMERS")
316            .arg(key)
317            .arg(group)
318            .query(self)
319    }
320
321    // XINFO GROUPS <key>
322
323    /// Returns all consumer `group`s created for a given stream `key`.
324    /// Take note of the StreamInfoGroupsReply return type.
325    ///
326    /// *It's possible this return value might not contain new fields
327    /// added by Redis in future versions.*
328    ///
329    #[inline]
330    fn xinfo_groups<K: ToRedisArgs>(&mut self, key: K) -> RedisResult<StreamInfoGroupsReply> {
331        cmd("XINFO").arg("GROUPS").arg(key).query(self)
332    }
333
334    // XINFO STREAM <key>
335
336    /// Returns info about high-level stream details
337    /// (first & last message `id`, length, number of groups, etc.)
338    /// Take note of the StreamInfoStreamReply return type.
339    ///
340    /// *It's possible this return value might not contain new fields
341    /// added by Redis in future versions.*
342    ///
343    #[inline]
344    fn xinfo_stream<K: ToRedisArgs>(&mut self, key: K) -> RedisResult<StreamInfoStreamReply> {
345        cmd("XINFO").arg("STREAM").arg(key).query(self)
346    }
347
348    // XLEN <key>
349    /// Returns the number of messages for a given stream `key`.
350    ///
351    #[inline]
352    fn xlen<K: ToRedisArgs, RV: FromRedisValue>(&mut self, key: K) -> RedisResult<RV> {
353        cmd("XLEN").arg(key).query(self)
354    }
355
356    // XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
357
358    /// This is a basic version of making XPENDING command calls which only
359    /// passes a stream `key` and consumer `group` and it
360    /// returns details about which consumers have pending messages
361    /// that haven't been acked.
362    ///
363    /// You can use this method along with
364    /// `xclaim` or `xclaim_options` for determining which messages
365    /// need to be retried.
366    ///
367    /// Take note of the StreamPendingReply return type.
368    ///
369    #[inline]
370    fn xpending<K: ToRedisArgs, G: ToRedisArgs>(
371        &mut self,
372        key: K,
373        group: G,
374    ) -> RedisResult<StreamPendingReply> {
375        cmd("XPENDING").arg(key).arg(group).query(self)
376    }
377
378    // XPENDING <key> <group> <start> <stop> <count>
379
380    /// This XPENDING version returns a list of all messages over the range.
381    /// You can use this for paginating pending messages (but without the message HashMap).
382    ///
383    /// Start and end follow the same rules `xrange` args. Set start to `-`
384    /// and end to `+` for the entire stream.
385    ///
386    /// Take note of the StreamPendingCountReply return type.
387    ///
388    #[inline]
389    fn xpending_count<
390        K: ToRedisArgs,
391        G: ToRedisArgs,
392        S: ToRedisArgs,
393        E: ToRedisArgs,
394        C: ToRedisArgs,
395    >(
396        &mut self,
397        key: K,
398        group: G,
399        start: S,
400        end: E,
401        count: C,
402    ) -> RedisResult<StreamPendingCountReply> {
403        cmd("XPENDING")
404            .arg(key)
405            .arg(group)
406            .arg(start)
407            .arg(end)
408            .arg(count)
409            .query(self)
410    }
411
412    // XPENDING <key> <group> <start> <stop> <count> <consumer>
413
414    /// An alternate version of `xpending_count` which filters by `consumer` name.
415    ///
416    /// Start and end follow the same rules `xrange` args. Set start to `-`
417    /// and end to `+` for the entire stream.
418    ///
419    /// Take note of the StreamPendingCountReply return type.
420    ///
421    #[inline]
422    fn xpending_consumer_count<
423        K: ToRedisArgs,
424        G: ToRedisArgs,
425        S: ToRedisArgs,
426        E: ToRedisArgs,
427        C: ToRedisArgs,
428        CN: ToRedisArgs,
429    >(
430        &mut self,
431        key: K,
432        group: G,
433        start: S,
434        end: E,
435        count: C,
436        consumer: CN,
437    ) -> RedisResult<StreamPendingCountReply> {
438        cmd("XPENDING")
439            .arg(key)
440            .arg(group)
441            .arg(start)
442            .arg(end)
443            .arg(count)
444            .arg(consumer)
445            .query(self)
446    }
447
448    // XRANGE key start end
449
450    /// Returns a range of messages in a given stream `key`.
451    ///
452    /// Set `start` to `-` to begin at the first message.
453    /// Set `end` to `+` to end the most recent message.
454    /// You can pass message `id` to both `start` and `end`.
455    ///
456    /// Take note of the StreamRangeReply return type.
457    ///
458    #[inline]
459    fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>(
460        &mut self,
461        key: K,
462        start: S,
463        end: E,
464    ) -> RedisResult<StreamRangeReply> {
465        cmd("XRANGE").arg(key).arg(start).arg(end).query(self)
466    }
467
468    // XRANGE key - +
469
470    /// A helper method for automatically returning all messages in a stream by `key`.
471    /// **Use with caution!**
472    ///
473    #[inline]
474    fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>(&mut self, key: K) -> RedisResult<RV> {
475        cmd("XRANGE").arg(key).arg("-").arg("+").query(self)
476    }
477
478    // XRANGE key start end [COUNT <n>]
479
480    /// A method for paginating a stream by `key`.
481    ///
482    #[inline]
483    fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
484        &mut self,
485        key: K,
486        start: S,
487        end: E,
488        count: C,
489    ) -> RedisResult<StreamRangeReply> {
490        cmd("XRANGE")
491            .arg(key)
492            .arg(start)
493            .arg(end)
494            .arg("COUNT")
495            .arg(count)
496            .query(self)
497    }
498
499    // XREAD STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
500
501    /// Read a list of `id`s for each stream `key`.
502    /// This is the basic form of reading streams.
503    /// For more advanced control, like blocking, limiting, or reading by consumer `group`,
504    /// see `xread_options`.
505    ///
506    #[inline]
507    fn xread<K: ToRedisArgs, ID: ToRedisArgs>(
508        &mut self,
509        keys: &[K],
510        ids: &[ID],
511    ) -> RedisResult<StreamReadReply> {
512        cmd("XREAD").arg("STREAMS").arg(keys).arg(ids).query(self)
513    }
514
515    // XREAD [BLOCK <milliseconds>] [COUNT <count>]
516    //       STREAMS key_1 key_2 ... key_N
517    //       ID_1 ID_2 ... ID_N
518    // XREADGROUP [BLOCK <milliseconds>] [COUNT <count>] [NOACK] [GROUP group-name consumer-name]
519    //       STREAMS key_1 key_2 ... key_N
520    //       ID_1 ID_2 ... ID_N
521
522    /// This method handles setting optional arguments for
523    /// `XREAD` or `XREADGROUP` Redis commands.
524    /// ```no_run
525    /// use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamReadOptions,StreamReadReply};
526    /// let client = client_open("redis://127.0.0.1/0").unwrap();
527    /// let mut con = client.get_connection().unwrap();
528    ///
529    /// // Read 10 messages from the start of the stream,
530    /// // without registering as a consumer group.
531    ///
532    /// let opts = StreamReadOptions::default()
533    ///     .count(10);
534    /// let results: RedisResult<StreamReadReply> =
535    ///     con.xread_options(&["k1"], &["0"], opts);
536    ///
537    /// // Read all undelivered messages for a given
538    /// // consumer group. Be advised: the consumer group must already
539    /// // exist before making this call. Also note: we're passing
540    /// // '>' as the id here, which means all undelivered messages.
541    ///
542    /// let opts = StreamReadOptions::default()
543    ///     .group("group-1", "consumer-1");
544    /// let results: RedisResult<StreamReadReply> =
545    ///     con.xread_options(&["k1"], &[">"], opts);
546    /// ```
547    ///
548    #[inline]
549    fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>(
550        &mut self,
551        keys: &[K],
552        ids: &[ID],
553        options: StreamReadOptions,
554    ) -> RedisResult<StreamReadReply> {
555        cmd(if options.read_only() {
556            "XREAD"
557        } else {
558            "XREADGROUP"
559        })
560        .arg(options)
561        .arg("STREAMS")
562        .arg(keys)
563        .arg(ids)
564        .query(self)
565    }
566
567    // XREVRANGE key end start
568
569    /// This is the reverse version of `xrange`.
570    /// The same rules apply for `start` and `end` here.
571    ///
572    #[inline]
573    fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>(
574        &mut self,
575        key: K,
576        end: E,
577        start: S,
578    ) -> RedisResult<StreamRangeReply> {
579        cmd("XREVRANGE").arg(key).arg(end).arg(start).query(self)
580    }
581
582    // XREVRANGE key + -
583
584    /// This is the reverse version of `xrange_all`.
585    /// The same rules apply for `start` and `end` here.
586    ///
587    fn xrevrange_all<K: ToRedisArgs>(&mut self, key: K) -> RedisResult<StreamRangeReply> {
588        cmd("XREVRANGE").arg(key).arg("+").arg("-").query(self)
589    }
590
591    // XREVRANGE key end start [COUNT <n>]
592
593    /// This is the reverse version of `xrange_count`.
594    /// The same rules apply for `start` and `end` here.
595    ///
596    #[inline]
597    fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>(
598        &mut self,
599        key: K,
600        end: E,
601        start: S,
602        count: C,
603    ) -> RedisResult<StreamRangeReply> {
604        cmd("XREVRANGE")
605            .arg(key)
606            .arg(end)
607            .arg(start)
608            .arg("COUNT")
609            .arg(count)
610            .query(self)
611    }
612
613    // XTRIM <key> MAXLEN [~|=] <count>  (Same as XADD MAXLEN option)
614
615    /// Trim a stream `key` to a MAXLEN count.
616    ///
617    #[inline]
618    fn xtrim<K: ToRedisArgs, RV: FromRedisValue>(
619        &mut self,
620        key: K,
621        maxlen: StreamMaxlen,
622    ) -> RedisResult<RV> {
623        cmd("XTRIM").arg(key).arg(maxlen).query(self)
624    }
625}
626
627impl<T> StreamCommands for T where T: ConnectionLike {}