redis_streams/commands.rs
1use crate::types::{
2 StreamClaimOptions, StreamClaimReply, StreamInfoConsumersReply, StreamInfoGroupsReply,
3 StreamInfoStreamReply, StreamMaxlen, StreamPendingCountReply, StreamPendingReply,
4 StreamRangeReply, StreamReadOptions, StreamReadReply,
5};
6
7use redis::{cmd, ConnectionLike, FromRedisValue, RedisResult, ToRedisArgs};
8
9/// Implementation of all redis stream commands.
10///
11pub trait StreamCommands: ConnectionLike + Sized {
12 // XACK <key> <group> <id> <id> ... <id>
13
14 /// Ack pending stream messages checked out by a consumer.
15 ///
16 #[inline]
17 fn xack<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
18 &mut self,
19 key: K,
20 group: G,
21 ids: &[ID],
22 ) -> RedisResult<RV> {
23 cmd("XACK").arg(key).arg(group).arg(ids).query(self)
24 }
25
26 // XADD key <ID or *> [field value] [field value] ...
27
28 /// Add a stream message by `key`. Use `*` as the `id` for the current timestamp.
29 ///
30 #[inline]
31 fn xadd<K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs, RV: FromRedisValue>(
32 &mut self,
33 key: K,
34 id: ID,
35 items: &[(F, V)],
36 ) -> RedisResult<RV> {
37 cmd("XADD").arg(key).arg(id).arg(items).query(self)
38 }
39
40 // XADD key <ID or *> [rust BTreeMap] ...
41
42 /// BTreeMap variant for adding a stream message by `key`.
43 /// Use `*` as the `id` for the current timestamp.
44 ///
45 #[inline]
46 fn xadd_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
47 &mut self,
48 key: K,
49 id: ID,
50 map: BTM,
51 ) -> RedisResult<RV> {
52 cmd("XADD").arg(key).arg(id).arg(map).query(self)
53 }
54
55 // XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ...
56
57 /// Add a stream message while capping the stream at a maxlength.
58 ///
59 #[inline]
60 fn xadd_maxlen<
61 K: ToRedisArgs,
62 ID: ToRedisArgs,
63 F: ToRedisArgs,
64 V: ToRedisArgs,
65 RV: FromRedisValue,
66 >(
67 &mut self,
68 key: K,
69 maxlen: StreamMaxlen,
70 id: ID,
71 items: &[(F, V)],
72 ) -> RedisResult<RV> {
73 cmd("XADD")
74 .arg(key)
75 .arg(maxlen)
76 .arg(id)
77 .arg(items)
78 .query(self)
79 }
80
81 // XADD key [MAXLEN [~|=] <count>] <ID or *> [rust BTreeMap] ...
82
83 /// BTreeMap variant for adding a stream message while capping the stream at a maxlength.
84 ///
85 #[inline]
86 fn xadd_maxlen_map<K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs, RV: FromRedisValue>(
87 &mut self,
88 key: K,
89 maxlen: StreamMaxlen,
90 id: ID,
91 map: BTM,
92 ) -> RedisResult<RV> {
93 cmd("XADD")
94 .arg(key)
95 .arg(maxlen)
96 .arg(id)
97 .arg(map)
98 .query(self)
99 }
100
101 // XCLAIM <key> <group> <consumer> <min-idle-time> [<ID-1> <ID-2>]
102
103 /// Claim pending, unacked messages, after some period of time,
104 /// currently checked out by another consumer.
105 ///
106 /// This method only accepts the must-have arguments for claiming messages.
107 /// If optional arguments are required, see `xclaim_options` below.
108 ///
109 #[inline]
110 fn xclaim<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, MIT: ToRedisArgs, ID: ToRedisArgs>(
111 &mut self,
112 key: K,
113 group: G,
114 consumer: C,
115 min_idle_time: MIT,
116 ids: &[ID],
117 ) -> RedisResult<StreamClaimReply> {
118 cmd("XCLAIM")
119 .arg(key)
120 .arg(group)
121 .arg(consumer)
122 .arg(min_idle_time)
123 .arg(ids)
124 .query(self)
125 }
126
127 // XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
128 // [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
129 // [FORCE] [JUSTID]
130
131 /// This is the optional arguments version for claiming unacked, pending messages
132 /// currently checked out by another consumer.
133 ///
134 /// ```no_run
135 /// use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamClaimOptions,StreamClaimReply};
136 /// let client = client_open("redis://127.0.0.1/0").unwrap();
137 /// let mut con = client.get_connection().unwrap();
138 ///
139 /// // Claim all pending messages for key "k1",
140 /// // from group "g1", checked out by consumer "c1"
141 /// // for 10ms with RETRYCOUNT 2 and FORCE
142 ///
143 /// let opts = StreamClaimOptions::default()
144 /// .with_force()
145 /// .retry(2);
146 /// let results: RedisResult<StreamClaimReply> =
147 /// con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);
148 ///
149 /// // All optional arguments return a `Result<StreamClaimReply>` with one exception:
150 /// // Passing JUSTID returns only the message `id` and omits the HashMap for each message.
151 ///
152 /// let opts = StreamClaimOptions::default()
153 /// .with_justid();
154 /// let results: RedisResult<Vec<String>> =
155 /// con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts);
156 /// ```
157 ///
158 #[inline]
159 fn xclaim_options<
160 K: ToRedisArgs,
161 G: ToRedisArgs,
162 C: ToRedisArgs,
163 MIT: ToRedisArgs,
164 ID: ToRedisArgs,
165 RV: FromRedisValue,
166 >(
167 &mut self,
168 key: K,
169 group: G,
170 consumer: C,
171 min_idle_time: MIT,
172 ids: &[ID],
173 options: StreamClaimOptions,
174 ) -> RedisResult<RV> {
175 cmd("XCLAIM")
176 .arg(key)
177 .arg(group)
178 .arg(consumer)
179 .arg(min_idle_time)
180 .arg(ids)
181 .arg(options)
182 .query(self)
183 }
184
185 // XDEL <key> [<ID1> <ID2> ... <IDN>]
186
187 /// Deletes a list of `id`s for a given stream `key`.
188 ///
189 #[inline]
190 fn xdel<K: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
191 &mut self,
192 key: K,
193 ids: &[ID],
194 ) -> RedisResult<RV> {
195 cmd("XDEL").arg(key).arg(ids).query(self)
196 }
197
198 // XGROUP CREATE <key> <groupname> <id or $>
199
200 /// This command is used for creating a consumer `group`. It expects the stream key
201 /// to already exist. Otherwise, use `xgroup_create_mkstream` if it doesn't.
202 /// The `id` is the starting message id all consumers should read from. Use `$` If you want
203 /// all consumers to read from the last message added to stream.
204 ///
205 #[inline]
206 fn xgroup_create<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
207 &mut self,
208 key: K,
209 group: G,
210 id: ID,
211 ) -> RedisResult<RV> {
212 cmd("XGROUP")
213 .arg("CREATE")
214 .arg(key)
215 .arg(group)
216 .arg(id)
217 .query(self)
218 }
219
220 // XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
221
222 /// This is the alternate version for creating a consumer `group`
223 /// which makes the stream if it doesn't exist.
224 ///
225 #[inline]
226 fn xgroup_create_mkstream<
227 K: ToRedisArgs,
228 G: ToRedisArgs,
229 ID: ToRedisArgs,
230 RV: FromRedisValue,
231 >(
232 &mut self,
233 key: K,
234 group: G,
235 id: ID,
236 ) -> RedisResult<RV> {
237 cmd("XGROUP")
238 .arg("CREATE")
239 .arg(key)
240 .arg(group)
241 .arg(id)
242 .arg("MKSTREAM")
243 .query(self)
244 }
245
246 // XGROUP SETID <key> <groupname> <id or $>
247
248 /// Alter which `id` you want consumers to begin reading from an existing
249 /// consumer `group`.
250 ///
251 #[inline]
252 fn xgroup_setid<K: ToRedisArgs, G: ToRedisArgs, ID: ToRedisArgs, RV: FromRedisValue>(
253 &mut self,
254 key: K,
255 group: G,
256 id: ID,
257 ) -> RedisResult<RV> {
258 cmd("XGROUP")
259 .arg("SETID")
260 .arg(key)
261 .arg(group)
262 .arg(id)
263 .query(self)
264 }
265
266 // XGROUP DESTROY <key> <groupname>
267
268 /// Destroy an existing consumer `group` for a given stream `key`
269 ///
270 #[inline]
271 fn xgroup_destroy<K: ToRedisArgs, G: ToRedisArgs, RV: FromRedisValue>(
272 &mut self,
273 key: K,
274 group: G,
275 ) -> RedisResult<RV> {
276 cmd("XGROUP").arg("DESTROY").arg(key).arg(group).query(self)
277 }
278
279 // XGROUP DELCONSUMER <key> <groupname> <consumername>
280
281 /// This deletes a `consumer` from an existing consumer `group`
282 /// for given stream `key.
283 ///
284 #[inline]
285 fn xgroup_delconsumer<K: ToRedisArgs, G: ToRedisArgs, C: ToRedisArgs, RV: FromRedisValue>(
286 &mut self,
287 key: K,
288 group: G,
289 consumer: C,
290 ) -> RedisResult<RV> {
291 cmd("XGROUP")
292 .arg("DELCONSUMER")
293 .arg(key)
294 .arg(group)
295 .arg(consumer)
296 .query(self)
297 }
298
299 // XINFO CONSUMERS <key> <group>
300
301 /// This returns all info details about
302 /// which consumers have read messages for given consumer `group`.
303 /// Take note of the StreamInfoConsumersReply return type.
304 ///
305 /// *It's possible this return value might not contain new fields
306 /// added by Redis in future versions.*
307 ///
308 #[inline]
309 fn xinfo_consumers<K: ToRedisArgs, G: ToRedisArgs>(
310 &mut self,
311 key: K,
312 group: G,
313 ) -> RedisResult<StreamInfoConsumersReply> {
314 cmd("XINFO")
315 .arg("CONSUMERS")
316 .arg(key)
317 .arg(group)
318 .query(self)
319 }
320
321 // XINFO GROUPS <key>
322
323 /// Returns all consumer `group`s created for a given stream `key`.
324 /// Take note of the StreamInfoGroupsReply return type.
325 ///
326 /// *It's possible this return value might not contain new fields
327 /// added by Redis in future versions.*
328 ///
329 #[inline]
330 fn xinfo_groups<K: ToRedisArgs>(&mut self, key: K) -> RedisResult<StreamInfoGroupsReply> {
331 cmd("XINFO").arg("GROUPS").arg(key).query(self)
332 }
333
334 // XINFO STREAM <key>
335
336 /// Returns info about high-level stream details
337 /// (first & last message `id`, length, number of groups, etc.)
338 /// Take note of the StreamInfoStreamReply return type.
339 ///
340 /// *It's possible this return value might not contain new fields
341 /// added by Redis in future versions.*
342 ///
343 #[inline]
344 fn xinfo_stream<K: ToRedisArgs>(&mut self, key: K) -> RedisResult<StreamInfoStreamReply> {
345 cmd("XINFO").arg("STREAM").arg(key).query(self)
346 }
347
348 // XLEN <key>
349 /// Returns the number of messages for a given stream `key`.
350 ///
351 #[inline]
352 fn xlen<K: ToRedisArgs, RV: FromRedisValue>(&mut self, key: K) -> RedisResult<RV> {
353 cmd("XLEN").arg(key).query(self)
354 }
355
356 // XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
357
358 /// This is a basic version of making XPENDING command calls which only
359 /// passes a stream `key` and consumer `group` and it
360 /// returns details about which consumers have pending messages
361 /// that haven't been acked.
362 ///
363 /// You can use this method along with
364 /// `xclaim` or `xclaim_options` for determining which messages
365 /// need to be retried.
366 ///
367 /// Take note of the StreamPendingReply return type.
368 ///
369 #[inline]
370 fn xpending<K: ToRedisArgs, G: ToRedisArgs>(
371 &mut self,
372 key: K,
373 group: G,
374 ) -> RedisResult<StreamPendingReply> {
375 cmd("XPENDING").arg(key).arg(group).query(self)
376 }
377
378 // XPENDING <key> <group> <start> <stop> <count>
379
380 /// This XPENDING version returns a list of all messages over the range.
381 /// You can use this for paginating pending messages (but without the message HashMap).
382 ///
383 /// Start and end follow the same rules `xrange` args. Set start to `-`
384 /// and end to `+` for the entire stream.
385 ///
386 /// Take note of the StreamPendingCountReply return type.
387 ///
388 #[inline]
389 fn xpending_count<
390 K: ToRedisArgs,
391 G: ToRedisArgs,
392 S: ToRedisArgs,
393 E: ToRedisArgs,
394 C: ToRedisArgs,
395 >(
396 &mut self,
397 key: K,
398 group: G,
399 start: S,
400 end: E,
401 count: C,
402 ) -> RedisResult<StreamPendingCountReply> {
403 cmd("XPENDING")
404 .arg(key)
405 .arg(group)
406 .arg(start)
407 .arg(end)
408 .arg(count)
409 .query(self)
410 }
411
412 // XPENDING <key> <group> <start> <stop> <count> <consumer>
413
414 /// An alternate version of `xpending_count` which filters by `consumer` name.
415 ///
416 /// Start and end follow the same rules `xrange` args. Set start to `-`
417 /// and end to `+` for the entire stream.
418 ///
419 /// Take note of the StreamPendingCountReply return type.
420 ///
421 #[inline]
422 fn xpending_consumer_count<
423 K: ToRedisArgs,
424 G: ToRedisArgs,
425 S: ToRedisArgs,
426 E: ToRedisArgs,
427 C: ToRedisArgs,
428 CN: ToRedisArgs,
429 >(
430 &mut self,
431 key: K,
432 group: G,
433 start: S,
434 end: E,
435 count: C,
436 consumer: CN,
437 ) -> RedisResult<StreamPendingCountReply> {
438 cmd("XPENDING")
439 .arg(key)
440 .arg(group)
441 .arg(start)
442 .arg(end)
443 .arg(count)
444 .arg(consumer)
445 .query(self)
446 }
447
448 // XRANGE key start end
449
450 /// Returns a range of messages in a given stream `key`.
451 ///
452 /// Set `start` to `-` to begin at the first message.
453 /// Set `end` to `+` to end the most recent message.
454 /// You can pass message `id` to both `start` and `end`.
455 ///
456 /// Take note of the StreamRangeReply return type.
457 ///
458 #[inline]
459 fn xrange<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs>(
460 &mut self,
461 key: K,
462 start: S,
463 end: E,
464 ) -> RedisResult<StreamRangeReply> {
465 cmd("XRANGE").arg(key).arg(start).arg(end).query(self)
466 }
467
468 // XRANGE key - +
469
470 /// A helper method for automatically returning all messages in a stream by `key`.
471 /// **Use with caution!**
472 ///
473 #[inline]
474 fn xrange_all<K: ToRedisArgs, RV: FromRedisValue>(&mut self, key: K) -> RedisResult<RV> {
475 cmd("XRANGE").arg(key).arg("-").arg("+").query(self)
476 }
477
478 // XRANGE key start end [COUNT <n>]
479
480 /// A method for paginating a stream by `key`.
481 ///
482 #[inline]
483 fn xrange_count<K: ToRedisArgs, S: ToRedisArgs, E: ToRedisArgs, C: ToRedisArgs>(
484 &mut self,
485 key: K,
486 start: S,
487 end: E,
488 count: C,
489 ) -> RedisResult<StreamRangeReply> {
490 cmd("XRANGE")
491 .arg(key)
492 .arg(start)
493 .arg(end)
494 .arg("COUNT")
495 .arg(count)
496 .query(self)
497 }
498
499 // XREAD STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
500
501 /// Read a list of `id`s for each stream `key`.
502 /// This is the basic form of reading streams.
503 /// For more advanced control, like blocking, limiting, or reading by consumer `group`,
504 /// see `xread_options`.
505 ///
506 #[inline]
507 fn xread<K: ToRedisArgs, ID: ToRedisArgs>(
508 &mut self,
509 keys: &[K],
510 ids: &[ID],
511 ) -> RedisResult<StreamReadReply> {
512 cmd("XREAD").arg("STREAMS").arg(keys).arg(ids).query(self)
513 }
514
515 // XREAD [BLOCK <milliseconds>] [COUNT <count>]
516 // STREAMS key_1 key_2 ... key_N
517 // ID_1 ID_2 ... ID_N
518 // XREADGROUP [BLOCK <milliseconds>] [COUNT <count>] [NOACK] [GROUP group-name consumer-name]
519 // STREAMS key_1 key_2 ... key_N
520 // ID_1 ID_2 ... ID_N
521
522 /// This method handles setting optional arguments for
523 /// `XREAD` or `XREADGROUP` Redis commands.
524 /// ```no_run
525 /// use redis_streams::{client_open,Connection,RedisResult,StreamCommands,StreamReadOptions,StreamReadReply};
526 /// let client = client_open("redis://127.0.0.1/0").unwrap();
527 /// let mut con = client.get_connection().unwrap();
528 ///
529 /// // Read 10 messages from the start of the stream,
530 /// // without registering as a consumer group.
531 ///
532 /// let opts = StreamReadOptions::default()
533 /// .count(10);
534 /// let results: RedisResult<StreamReadReply> =
535 /// con.xread_options(&["k1"], &["0"], opts);
536 ///
537 /// // Read all undelivered messages for a given
538 /// // consumer group. Be advised: the consumer group must already
539 /// // exist before making this call. Also note: we're passing
540 /// // '>' as the id here, which means all undelivered messages.
541 ///
542 /// let opts = StreamReadOptions::default()
543 /// .group("group-1", "consumer-1");
544 /// let results: RedisResult<StreamReadReply> =
545 /// con.xread_options(&["k1"], &[">"], opts);
546 /// ```
547 ///
548 #[inline]
549 fn xread_options<K: ToRedisArgs, ID: ToRedisArgs>(
550 &mut self,
551 keys: &[K],
552 ids: &[ID],
553 options: StreamReadOptions,
554 ) -> RedisResult<StreamReadReply> {
555 cmd(if options.read_only() {
556 "XREAD"
557 } else {
558 "XREADGROUP"
559 })
560 .arg(options)
561 .arg("STREAMS")
562 .arg(keys)
563 .arg(ids)
564 .query(self)
565 }
566
567 // XREVRANGE key end start
568
569 /// This is the reverse version of `xrange`.
570 /// The same rules apply for `start` and `end` here.
571 ///
572 #[inline]
573 fn xrevrange<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs>(
574 &mut self,
575 key: K,
576 end: E,
577 start: S,
578 ) -> RedisResult<StreamRangeReply> {
579 cmd("XREVRANGE").arg(key).arg(end).arg(start).query(self)
580 }
581
582 // XREVRANGE key + -
583
584 /// This is the reverse version of `xrange_all`.
585 /// The same rules apply for `start` and `end` here.
586 ///
587 fn xrevrange_all<K: ToRedisArgs>(&mut self, key: K) -> RedisResult<StreamRangeReply> {
588 cmd("XREVRANGE").arg(key).arg("+").arg("-").query(self)
589 }
590
591 // XREVRANGE key end start [COUNT <n>]
592
593 /// This is the reverse version of `xrange_count`.
594 /// The same rules apply for `start` and `end` here.
595 ///
596 #[inline]
597 fn xrevrange_count<K: ToRedisArgs, E: ToRedisArgs, S: ToRedisArgs, C: ToRedisArgs>(
598 &mut self,
599 key: K,
600 end: E,
601 start: S,
602 count: C,
603 ) -> RedisResult<StreamRangeReply> {
604 cmd("XREVRANGE")
605 .arg(key)
606 .arg(end)
607 .arg(start)
608 .arg("COUNT")
609 .arg(count)
610 .query(self)
611 }
612
613 // XTRIM <key> MAXLEN [~|=] <count> (Same as XADD MAXLEN option)
614
615 /// Trim a stream `key` to a MAXLEN count.
616 ///
617 #[inline]
618 fn xtrim<K: ToRedisArgs, RV: FromRedisValue>(
619 &mut self,
620 key: K,
621 maxlen: StreamMaxlen,
622 ) -> RedisResult<RV> {
623 cmd("XTRIM").arg(key).arg(maxlen).query(self)
624 }
625}
626
627impl<T> StreamCommands for T where T: ConnectionLike {}