1use redis::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value};
2
3use std::collections::HashMap;
4use std::io::{Error, ErrorKind};
5
6#[derive(PartialEq, Eq, Clone, Debug, Copy)]
12pub enum StreamMaxlen {
13 Equals(usize),
14 Aprrox(usize),
15}
16
17impl ToRedisArgs for StreamMaxlen {
18 fn write_redis_args<W>(&self, out: &mut W)
19 where
20 W: ?Sized + RedisWrite,
21 {
22 let (ch, val) = match *self {
23 StreamMaxlen::Equals(v) => ("=", v),
24 StreamMaxlen::Aprrox(v) => ("~", v),
25 };
26 out.write_arg("MAXLEN".as_bytes());
27 out.write_arg(ch.as_bytes());
28 val.write_redis_args(out);
29 }
30}
31
32#[derive(Default, Debug)]
37pub struct StreamClaimOptions {
38 idle: Option<usize>,
40 time: Option<usize>,
42 retry: Option<usize>,
44 force: bool,
46 justid: bool,
49}
50
51impl StreamClaimOptions {
52 pub fn idle(mut self, ms: usize) -> Self {
53 self.idle = Some(ms);
54 self
55 }
56
57 pub fn time(mut self, ms_time: usize) -> Self {
58 self.time = Some(ms_time);
59 self
60 }
61
62 pub fn retry(mut self, count: usize) -> Self {
63 self.retry = Some(count);
64 self
65 }
66
67 pub fn with_force(mut self) -> Self {
68 self.force = true;
69 self
70 }
71
72 pub fn with_justid(mut self) -> Self {
73 self.justid = true;
74 self
75 }
76}
77
78impl ToRedisArgs for StreamClaimOptions {
79 fn write_redis_args<W>(&self, out: &mut W)
80 where
81 W: ?Sized + RedisWrite,
82 {
83 if let Some(ref ms) = self.idle {
84 out.write_arg("IDLE".as_bytes());
85 out.write_arg(format!("{}", ms).as_bytes());
86 }
87 if let Some(ref ms_time) = self.time {
88 out.write_arg("TIME".as_bytes());
89 out.write_arg(format!("{}", ms_time).as_bytes());
90 }
91 if let Some(ref count) = self.retry {
92 out.write_arg("RETRYCOUNT".as_bytes());
93 out.write_arg(format!("{}", count).as_bytes());
94 }
95 if self.force {
96 out.write_arg("FORCE".as_bytes());
97 }
98 if self.justid {
99 out.write_arg("JUSTID".as_bytes());
100 }
101 }
102}
103
104#[derive(Default, Debug)]
109pub struct StreamReadOptions {
110 block: Option<usize>,
112 count: Option<usize>,
114 noack: Option<bool>,
116 group: Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>,
119}
120
121impl StreamReadOptions {
122 pub fn read_only(&self) -> bool {
123 self.group.is_none()
124 }
125
126 pub fn noack(mut self) -> Self {
127 self.noack = Some(true);
128 self
129 }
130
131 pub fn block(mut self, ms: usize) -> Self {
132 self.block = Some(ms);
133 self
134 }
135
136 pub fn count(mut self, n: usize) -> Self {
137 self.count = Some(n);
138 self
139 }
140
141 pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
142 mut self,
143 group_name: GN,
144 consumer_name: CN,
145 ) -> Self {
146 self.group = Some((
147 ToRedisArgs::to_redis_args(&group_name),
148 ToRedisArgs::to_redis_args(&consumer_name),
149 ));
150 self
151 }
152}
153
154impl ToRedisArgs for StreamReadOptions {
155 fn write_redis_args<W>(&self, out: &mut W)
156 where
157 W: ?Sized + RedisWrite,
158 {
159 if let Some(ref ms) = self.block {
160 out.write_arg("BLOCK".as_bytes());
161 out.write_arg(format!("{}", ms).as_bytes());
162 }
163
164 if let Some(ref n) = self.count {
165 out.write_arg("COUNT".as_bytes());
166 out.write_arg(format!("{}", n).as_bytes());
167 }
168
169 if let Some(ref group) = self.group {
170 if let Some(true) = self.noack {
172 out.write_arg("NOACK".as_bytes());
173 }
174
175 out.write_arg("GROUP".as_bytes());
176 for i in &group.0 {
177 out.write_arg(i);
178 }
179 for i in &group.1 {
180 out.write_arg(i);
181 }
182 }
183 }
184}
185
186#[derive(Default, Debug, Clone)]
192pub struct StreamReadReply {
193 pub keys: Vec<StreamKey>,
194}
195
196#[derive(Default, Debug, Clone)]
206pub struct StreamRangeReply {
207 pub ids: Vec<StreamId>,
208}
209
210#[derive(Default, Debug, Clone)]
215pub struct StreamClaimReply {
216 pub ids: Vec<StreamId>,
217}
218
219#[derive(Debug, Clone)]
224pub enum StreamPendingReply {
225 Empty,
226 Data(StreamPendingData),
227}
228
229impl Default for StreamPendingReply {
230 fn default() -> StreamPendingReply {
231 StreamPendingReply::Empty
232 }
233}
234
235impl StreamPendingReply {
236 pub fn count(&self) -> usize {
237 match self {
238 StreamPendingReply::Empty => 0,
239 StreamPendingReply::Data(x) => x.count,
240 }
241 }
242}
243
244#[derive(Default, Debug, Clone)]
246pub struct StreamPendingData {
247 pub count: usize,
248 pub start_id: String,
249 pub end_id: String,
250 pub consumers: Vec<StreamInfoConsumer>,
251}
252
253#[derive(Default, Debug, Clone)]
260pub struct StreamPendingCountReply {
261 pub ids: Vec<StreamPendingId>,
262}
263
264#[derive(Default, Debug, Clone)]
269pub struct StreamInfoStreamReply {
270 pub last_generated_id: String,
271 pub radix_tree_keys: usize,
272 pub groups: usize,
273 pub length: usize,
274 pub first_entry: StreamId,
275 pub last_entry: StreamId,
276}
277
278#[derive(Default, Debug, Clone)]
283pub struct StreamInfoConsumersReply {
284 pub consumers: Vec<StreamInfoConsumer>,
285}
286
287#[derive(Default, Debug, Clone)]
292pub struct StreamInfoGroupsReply {
293 pub groups: Vec<StreamInfoGroup>,
294}
295
296#[derive(Default, Debug, Clone)]
301pub struct StreamInfoConsumer {
302 pub name: String,
303 pub pending: usize,
304 pub idle: usize,
305}
306
307#[derive(Default, Debug, Clone)]
312pub struct StreamInfoGroup {
313 pub name: String,
314 pub consumers: usize,
315 pub pending: usize,
316 pub last_delivered_id: String,
317}
318
319#[derive(Default, Debug, Clone)]
321pub struct StreamPendingId {
322 pub id: String,
323 pub consumer: String,
324 pub last_delivered_ms: usize,
325 pub times_delivered: usize,
326}
327
328#[derive(Default, Debug, Clone)]
330pub struct StreamKey {
331 pub key: String,
332 pub ids: Vec<StreamId>,
333}
334
335impl StreamKey {
336 pub fn just_ids(&self) -> Vec<&String> {
337 self.ids.iter().map(|msg| &msg.id).collect::<Vec<&String>>()
338 }
339}
340
341#[derive(Default, Debug, Clone)]
343pub struct StreamId {
344 pub id: String,
345 pub map: HashMap<String, Value>,
346}
347
348impl StreamId {
349 pub fn from_bulk_value(v: &Value) -> RedisResult<Self> {
350 let mut stream_id = StreamId::default();
351 match *v {
352 Value::Bulk(ref values) => {
353 if let Some(v) = values.get(0) {
354 stream_id.id = from_redis_value(&v)?;
355 }
356 if let Some(v) = values.get(1) {
357 stream_id.map = from_redis_value(&v)?;
358 }
359 }
360 _ => {}
361 }
362
363 Ok(stream_id)
364 }
365
366 pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
367 match self.find(&key) {
368 Some(ref x) => from_redis_value(*x).ok(),
369 None => None,
370 }
371 }
372
373 pub fn find(&self, key: &&str) -> Option<&Value> {
374 self.map.get(*key)
375 }
376
377 pub fn contains_key(&self, key: &&str) -> bool {
378 self.find(key).is_some()
379 }
380
381 pub fn len(&self) -> usize {
382 self.map.len()
383 }
384}
385
386impl FromRedisValue for StreamReadReply {
387 fn from_redis_value(v: &Value) -> RedisResult<Self> {
388 let rows: Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>> =
389 from_redis_value(v)?;
390 let mut reply = StreamReadReply::default();
391 for row in &rows {
392 for (key, entry) in row.iter() {
393 let mut k = StreamKey::default();
394 k.key = key.to_owned();
395 for id_row in entry {
396 let mut i = StreamId::default();
397 for (id, map) in id_row.iter() {
398 i.id = id.to_owned();
399 i.map = map.to_owned();
400 }
401 k.ids.push(i);
402 }
403 reply.keys.push(k);
404 }
405 }
406 Ok(reply)
407 }
408}
409
410impl FromRedisValue for StreamRangeReply {
411 fn from_redis_value(v: &Value) -> RedisResult<Self> {
412 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
413 let mut reply = StreamRangeReply::default();
414 for row in &rows {
415 let mut i = StreamId::default();
416 for (id, map) in row.iter() {
417 i.id = id.to_owned();
418 i.map = map.to_owned();
419 }
420 reply.ids.push(i);
421 }
422 Ok(reply)
423 }
424}
425
426impl FromRedisValue for StreamClaimReply {
427 fn from_redis_value(v: &Value) -> RedisResult<Self> {
428 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
429 let mut reply = StreamClaimReply::default();
430 for row in &rows {
431 let mut i = StreamId::default();
432 for (id, map) in row.iter() {
433 i.id = id.to_owned();
434 i.map = map.to_owned();
435 }
436 reply.ids.push(i);
437 }
438 Ok(reply)
439 }
440}
441
442impl FromRedisValue for StreamPendingReply {
443 fn from_redis_value(v: &Value) -> RedisResult<Self> {
444 let parts: (usize, Option<String>, Option<String>, Vec<Vec<String>>) = from_redis_value(v)?;
445 let count = parts.0.to_owned() as usize;
446
447 if count == 0 {
448 Ok(StreamPendingReply::Empty)
449 } else {
450 let mut result = StreamPendingData::default();
451
452 let start_id = match parts.1.to_owned() {
453 Some(start) => Ok(start),
454 None => Err(Error::new(
455 ErrorKind::Other,
456 "IllegalState: Non-zero pending expects start id",
457 )),
458 }?;
459
460 let end_id = match parts.2.to_owned() {
461 Some(end) => Ok(end),
462 None => Err(Error::new(
463 ErrorKind::Other,
464 "IllegalState: Non-zero pending expects end id",
465 )),
466 }?;
467
468 result.count = count;
469 result.start_id = start_id;
470 result.end_id = end_id;
471
472 for consumer in &parts.3 {
473 let mut info = StreamInfoConsumer::default();
474 info.name = consumer[0].to_owned();
475 if let Ok(v) = consumer[1].to_owned().parse::<usize>() {
476 info.pending = v;
477 }
478 result.consumers.push(info);
479 }
480
481 Ok(StreamPendingReply::Data(result))
482 }
483 }
484}
485
486impl FromRedisValue for StreamPendingCountReply {
487 fn from_redis_value(v: &Value) -> RedisResult<Self> {
488 let parts: Vec<Vec<(String, String, usize, usize)>> = from_redis_value(v)?;
489 let mut reply = StreamPendingCountReply::default();
490 for row in &parts {
491 let mut p = StreamPendingId::default();
492 p.id = row[0].0.to_owned();
493 p.consumer = row[0].1.to_owned();
494 p.last_delivered_ms = row[0].2.to_owned();
495 p.times_delivered = row[0].3.to_owned();
496 reply.ids.push(p);
497 }
498 Ok(reply)
499 }
500}
501
502impl FromRedisValue for StreamInfoStreamReply {
503 fn from_redis_value(v: &Value) -> RedisResult<Self> {
504 let map: HashMap<String, Value> = from_redis_value(v)?;
505 let mut reply = StreamInfoStreamReply::default();
506 if let Some(v) = &map.get("last-generated-id") {
507 reply.last_generated_id = from_redis_value(v)?;
508 }
509 if let Some(v) = &map.get("radix-tree-nodes") {
510 reply.radix_tree_keys = from_redis_value(v)?;
511 }
512 if let Some(v) = &map.get("groups") {
513 reply.groups = from_redis_value(v)?;
514 }
515 if let Some(v) = &map.get("length") {
516 reply.length = from_redis_value(v)?;
517 }
518 if let Some(v) = &map.get("first-entry") {
519 reply.first_entry = StreamId::from_bulk_value(v)?;
520 }
521 if let Some(v) = &map.get("last-entry") {
522 reply.last_entry = StreamId::from_bulk_value(v)?;
523 }
524 Ok(reply)
525 }
526}
527
528impl FromRedisValue for StreamInfoConsumersReply {
529 fn from_redis_value(v: &Value) -> RedisResult<Self> {
530 let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
531 let mut reply = StreamInfoConsumersReply::default();
532 for map in consumers {
533 let mut c = StreamInfoConsumer::default();
534 if let Some(v) = &map.get("name") {
535 c.name = from_redis_value(v)?;
536 }
537 if let Some(v) = &map.get("pending") {
538 c.pending = from_redis_value(v)?;
539 }
540 if let Some(v) = &map.get("idle") {
541 c.idle = from_redis_value(v)?;
542 }
543 reply.consumers.push(c);
544 }
545
546 Ok(reply)
547 }
548}
549
550impl FromRedisValue for StreamInfoGroupsReply {
551 fn from_redis_value(v: &Value) -> RedisResult<Self> {
552 let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
553 let mut reply = StreamInfoGroupsReply::default();
554 for map in groups {
555 let mut g = StreamInfoGroup::default();
556 if let Some(v) = &map.get("name") {
557 g.name = from_redis_value(v)?;
558 }
559 if let Some(v) = &map.get("pending") {
560 g.pending = from_redis_value(v)?;
561 }
562 if let Some(v) = &map.get("consumers") {
563 g.consumers = from_redis_value(v)?;
564 }
565 if let Some(v) = &map.get("last-delivered-id") {
566 g.last_delivered_id = from_redis_value(v)?;
567 }
568 reply.groups.push(g);
569 }
570 Ok(reply)
571 }
572}