1use core::{fmt, time, cmp};
4use core::str::FromStr;
5use core::convert::TryInto;
6
7use redis::{FromRedisValue, ErrorKind, Value, RedisResult, RedisError, RedisWrite, ToRedisArgs};
8
9use crate::queue::QueueConfig;
10
11pub(crate) mod idents {
12 macro_rules! define_term {
13 ($($ident:ident),+) => {
14 $(
15 pub const $ident: &'static str = stringify!($ident);
16 )+
17 };
18 }
19
20 define_term!(TYPE, XGROUP, CREATE, MKSTREAM, BUSYGROUP, TIME, XLEN, XADD, XREADGROUP, XPENDING, XACK, XDEL);
21 define_term!(XINFO, GROUPS, XTRIM);
22 define_term!(GROUP, COUNT, BLOCK, STREAMS, IDLE, MAXLEN, MINID);
23}
24
25fn parse_redis_key(value: &redis::Value) -> Result<&str, RedisError> {
26 match value {
27 redis::Value::Data(ref data) => match core::str::from_utf8(data) {
28 Ok(key) => Ok(key),
29 Err(_) => Err((redis::ErrorKind::TypeError, "Non-UTF8 stream field's name").into()),
30 },
31 _ => Err((redis::ErrorKind::TypeError, "Invalid stream field's name").into()),
32 }
33}
34
35macro_rules! assign_field_if {
36 ($field:ident = $value:ident IF $key:ident == $expected:expr) => {
37 if $field.is_none() && $key.eq_ignore_ascii_case($expected) {
38 $field = Some(FromRedisValue::from_redis_value($value)?);
39 continue;
40 }
41 };
42}
43
44#[cold]
45#[inline(never)]
46fn unlikely_redis_error(kind: redis::ErrorKind, text: &'static str) -> RedisError {
47 (kind, text).into()
48}
49
50macro_rules! unwrap_required_field {
51 ($field:ident) => {
52 match $field {
53 Some(field) => field,
54 None => {
55 return Err(unlikely_redis_error(
56 redis::ErrorKind::TypeError,
57 concat!("'", stringify!($field), "' is missing"),
58 ))
59 }
60 }
61 };
62}
63
64#[derive(Copy, Clone, Debug, PartialEq, Eq)]
65pub(crate) enum RedisType {
67 String,
69 List,
71 Set,
73 ZSet,
75 Hash,
77 Stream,
79 None,
81}
82
83impl RedisType {
84 #[inline(always)]
85 pub fn parse(value: &str) -> Option<Self> {
86 if value.eq_ignore_ascii_case("string") {
87 Some(Self::String)
88 } else if value.eq_ignore_ascii_case("list") {
89 Some(Self::List)
90 } else if value.eq_ignore_ascii_case("set") {
91 Some(Self::Set)
92 } else if value.eq_ignore_ascii_case("zset") {
93 Some(Self::ZSet)
94 } else if value.eq_ignore_ascii_case("hash") {
95 Some(Self::Hash)
96 } else if value.eq_ignore_ascii_case("stream") {
97 Some(Self::Stream)
98 } else if value.eq_ignore_ascii_case("none") {
99 Some(Self::None)
100 } else {
101 None
102 }
103 }
104}
105
106impl FromRedisValue for RedisType {
107 fn from_redis_value(value: &Value) -> RedisResult<Self> {
108 match value {
109 Value::Bulk(_) => Err((ErrorKind::TypeError, "Not a single value").into()),
110 Value::Data(value) => match core::str::from_utf8(value) {
111 Ok(value) => match Self::parse(value) {
112 Some(result) => Ok(result),
113 None => Err((ErrorKind::TypeError, "Not a type").into()),
114 },
115 Err(_) => Err((ErrorKind::TypeError, "Not a string").into()),
116 },
117 Value::Nil => Err((ErrorKind::TypeError, "unexpected null").into()),
118 Value::Int(_) => Err((ErrorKind::TypeError, "unexpected Integer").into()),
119 Value::Okay => Err((ErrorKind::TypeError, "unexpected OK").into()),
120 Value::Status(response) => match Self::parse(response) {
121 Some(result) => Ok(result),
122 None => Err((ErrorKind::TypeError, "Not a type").into()),
123 },
124 }
125 }
126
127 fn from_byte_vec(vec: &[u8]) -> Option<Vec<Self>> {
128 match core::str::from_utf8(vec) {
129 Ok(value) => Self::parse(value).map(|val| vec![val]),
130 Err(_) => None,
131 }
132 }
133}
134
135#[derive(Debug, Copy, Clone)]
136pub enum TrimMethod {
138 MaxLen(u64),
140 MinId(StreamId),
144}
145
146impl ToRedisArgs for TrimMethod {
147 #[inline(always)]
148 fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
149 match self {
150 Self::MaxLen(threshold) => {
151 idents::MAXLEN.write_redis_args(out);
152 threshold.write_redis_args(out);
153 }
154 Self::MinId(id) => {
155 idents::MINID.write_redis_args(out);
156 id.write_redis_args(out);
157 }
158 }
159 }
160
161 #[inline(always)]
162 fn is_single_arg(&self) -> bool {
163 false
164 }
165}
166
167#[derive(Debug)]
168pub enum StreamIdParseError {
170 InvalidType,
172 InvalidTimestamp,
174 MissingSequence,
176 InvalidSequence,
178}
179
180impl StreamIdParseError {
181 #[inline(always)]
182 const fn as_str(&self) -> &'static str {
183 match self {
184 Self::InvalidType => "Not a valid stream id",
185 Self::InvalidTimestamp => "Invalid timestamp",
186 Self::MissingSequence => "Missing sequence part",
187 Self::InvalidSequence => "Invalid sequence number",
188 }
189 }
190}
191
192impl fmt::Display for StreamIdParseError {
193 #[inline(always)]
194 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
195 fmt.write_str(self.as_str())
196 }
197}
198
199impl std::error::Error for StreamIdParseError {}
200
201#[derive(Copy, Clone, PartialEq, Eq)]
202#[repr(transparent)]
203pub struct TimestampId {
209 timestamp: u64,
211}
212
213impl TimestampId {
214 #[inline]
215 pub fn new(timestamp: time::Duration) -> Self {
219 Self {
220 timestamp: match timestamp.as_millis().try_into() {
222 Ok(res) => res,
223 Err(_) => u64::max_value(),
224 },
225 }
226 }
227}
228
229impl fmt::Debug for TimestampId {
230 #[inline(always)]
231 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
232 let Self { timestamp } = self;
233 fmt::Debug::fmt(timestamp, fmt)
234 }
235}
236
237impl fmt::Display for TimestampId {
238 #[inline(always)]
239 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
240 let Self { timestamp } = self;
241 fmt.write_fmt(format_args!("{timestamp}-*"))
242 }
243}
244
245impl ToRedisArgs for TimestampId {
246 #[inline(always)]
247 fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
248 self.timestamp.write_redis_args(out)
249 }
250
251 #[inline(always)]
252 fn is_single_arg(&self) -> bool {
253 true
254 }
255}
256
257#[derive(Copy, Clone, PartialEq, Eq, Hash)]
258pub struct StreamId {
260 timestamp: u64,
262 seq: u64,
264}
265
266impl StreamId {
267 #[inline(always)]
268 pub const fn nil() -> Self {
270 Self { timestamp: 0, seq: 0 }
271 }
272
273 #[inline(always)]
274 pub const fn is_nil(&self) -> bool {
278 self.timestamp == 0 && self.seq == 0
279 }
280
281 #[inline(always)]
282 pub const fn as_timestamp(&self) -> time::Duration {
284 time::Duration::from_millis(self.timestamp)
285 }
286
287 pub const fn next(&self) -> Self {
289 if self.timestamp == u64::max_value() {
290 Self {
291 timestamp: self.timestamp,
292 seq: self.seq.saturating_add(1),
293 }
294 } else if self.seq == u64::max_value() {
295 Self {
296 timestamp: self.timestamp.saturating_add(1),
297 seq: 0,
298 }
299 } else {
300 Self {
301 timestamp: self.timestamp,
302 seq: self.seq + 1,
303 }
304 }
305 }
306
307 pub const fn prev(&self) -> Self {
309 if self.timestamp == 0 {
310 Self {
311 timestamp: self.timestamp,
312 seq: self.seq.saturating_sub(1),
313 }
314 } else if self.seq == 0 {
315 Self {
316 timestamp: self.timestamp.saturating_sub(1),
317 seq: 0,
318 }
319 } else {
320 Self {
321 timestamp: self.timestamp,
322 seq: self.seq - 1,
323 }
324 }
325 }
326}
327
328impl fmt::Debug for StreamId {
329 #[inline(always)]
330 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
331 let Self { timestamp, seq } = self;
332 fmt::Debug::fmt(&(timestamp, seq), fmt)
333 }
334}
335
336impl PartialOrd for StreamId {
337 #[allow(clippy::non_canonical_partial_ord_impl)]
338 #[inline(always)]
339 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
340 PartialOrd::partial_cmp(&(self.timestamp, self.seq), &(other.timestamp, other.seq))
341 }
342}
343
344impl Ord for StreamId {
345 #[inline(always)]
346 fn cmp(&self, other: &Self) -> cmp::Ordering {
347 Ord::cmp(&(self.timestamp, self.seq), &(other.timestamp, other.seq))
348 }
349}
350
351impl FromStr for StreamId {
352 type Err = StreamIdParseError;
353
354 fn from_str(data: &str) -> Result<Self, Self::Err> {
355 let mut split = data.split('-');
356 let timestamp = match split.next() {
357 Some(timestamp) => match timestamp.parse() {
358 Ok(timestamp) => timestamp,
359 Err(_) => {
360 return Err(StreamIdParseError::InvalidTimestamp);
361 }
362 },
363 None => return Err(StreamIdParseError::InvalidType),
364 };
365
366 let seq = match split.next() {
367 Some(seq) => match seq.parse() {
368 Ok(seq) => seq,
369 Err(_) => return Err(StreamIdParseError::InvalidSequence),
370 },
371 None => return Err(StreamIdParseError::MissingSequence),
372 };
373
374 Ok(Self { timestamp, seq })
375 }
376}
377
378impl fmt::Display for StreamId {
379 #[inline(always)]
380 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
381 let Self { timestamp, seq } = self;
382 fmt.write_fmt(format_args!("{timestamp}-{seq}"))
383 }
384}
385
386impl ToRedisArgs for StreamId {
387 #[inline(always)]
388 fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
389 const STREAM_MAX_SIZE: usize = 20 + 1 + 20;
390 let mut buf = str_buf::StrBuf::<STREAM_MAX_SIZE>::new();
391 let _ = fmt::Write::write_fmt(&mut buf, format_args!("{self}"));
392
393 out.write_arg_fmt(buf.as_str())
394 }
395
396 #[inline(always)]
397 fn is_single_arg(&self) -> bool {
398 true
399 }
400}
401
402impl FromRedisValue for StreamId {
403 fn from_redis_value(value: &Value) -> RedisResult<Self> {
404 match value {
405 Value::Data(data) => match core::str::from_utf8(data) {
406 Ok(data) => match data.parse() {
407 Ok(result) => Ok(result),
408 Err(error) => Err((redis::ErrorKind::InvalidClientConfig, error.as_str()).into()),
409 },
410 Err(_) => Err((redis::ErrorKind::TypeError, "Not a string").into()),
411 },
412 Value::Bulk(_) => Err((redis::ErrorKind::TypeError, "Not bulk instead of stream id").into()),
413 Value::Nil => Ok(StreamId::nil()),
414 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
415 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
416 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
417 }
418 }
419}
420
421#[derive(Debug)]
422pub struct GroupInfo {
424 pub name: String,
426 pub consumers: u64,
428 pub pending: u64,
430 pub last_delivered_id: StreamId,
432 }
438
439impl GroupInfo {
440 const USER_FIELD_NAME: &'static str = "name";
441 const USER_FIELD_CONSUMERS: &'static str = "consumers";
442 const USER_FIELD_PENDING: &'static str = "pending";
443 const USER_FIELD_LAST_DELIVERED_ID: &'static str = "last-delivered-id";
444}
445
446impl FromRedisValue for GroupInfo {
447 fn from_redis_value(value: &Value) -> RedisResult<Self> {
448 match value {
449 Value::Bulk(values) => {
451 let mut name = None;
452 let mut consumers = None;
453 let mut pending = None;
454 let mut last_delivered_id = None;
455
456 if values.len() < 8 {
457 return Err((
458 redis::ErrorKind::TypeError,
459 "Insufficient number of values returned. Need at least 8",
460 )
461 .into());
462 }
463
464 for pair in values.chunks(2) {
465 let key = parse_redis_key(&pair[0])?;
466 let value = &pair[1];
467
468 assign_field_if!(name = value IF key == Self::USER_FIELD_NAME);
469 assign_field_if!(consumers = value IF key == Self::USER_FIELD_CONSUMERS);
470 assign_field_if!(pending = value IF key == Self::USER_FIELD_PENDING);
471 assign_field_if!(last_delivered_id = value IF key == Self::USER_FIELD_LAST_DELIVERED_ID);
472 }
473
474 let name = unwrap_required_field!(name);
475 let consumers = unwrap_required_field!(consumers);
476 let pending = unwrap_required_field!(pending);
477 let last_delivered_id = unwrap_required_field!(last_delivered_id);
478 Ok(Self {
479 name,
480 consumers,
481 pending,
482 last_delivered_id,
483 })
484 }
485 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a pending field").into()),
486 Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
487 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
488 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
489 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
490 }
491 }
492}
493
494pub struct PendingConsumerStat {
496 pub name: String,
498 pub no_ack_num: u64,
500}
501
502impl FromRedisValue for PendingConsumerStat {
503 fn from_redis_value(value: &Value) -> Result<Self, RedisError> {
504 match value {
505 Value::Bulk(values) => {
506 if values.len() == 2 {
507 Ok(Self {
508 name: FromRedisValue::from_redis_value(&values[0])?,
509 no_ack_num: FromRedisValue::from_redis_value(&values[1])?,
510 })
511 } else {
512 Err((
513 redis::ErrorKind::TypeError,
514 "PendingConsumerStat array requires 2 elements",
515 )
516 .into())
517 }
518 }
519 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a PendingConsumerStat array").into()),
520 Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
521 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
522 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
523 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
524 }
525 }
526}
527
528pub struct PendingStats {
532 pub len: u64,
536 pub lowest_id: StreamId,
538 pub highest_id: StreamId,
540 pub consumers: Vec<PendingConsumerStat>,
542}
543
544impl FromRedisValue for PendingStats {
545 fn from_redis_value(value: &Value) -> Result<Self, RedisError> {
546 match value {
547 Value::Bulk(values) => {
548 if values.len() == 4 {
549 Ok(Self {
550 len: FromRedisValue::from_redis_value(&values[0])?,
551 lowest_id: FromRedisValue::from_redis_value(&values[1])?,
552 highest_id: FromRedisValue::from_redis_value(&values[2])?,
553 consumers: FromRedisValue::from_redis_value(&values[3])?,
554 })
555 } else {
556 Err((redis::ErrorKind::TypeError, "PendingStats array requires 4 elements").into())
557 }
558 }
559 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a PendingStats array").into()),
560 Value::Nil => Ok(Self {
563 len: 0,
564 lowest_id: StreamId::nil(),
565 highest_id: StreamId::nil(),
566 consumers: Vec::new(),
567 }),
568 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
569 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
570 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
571 }
572 }
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub struct EntryValue<T> {
578 pub id: uuid::Uuid,
582 pub payload: T,
584}
585
586impl<T> EntryValue<T> {
587 const USER_FIELD_ID: &'static str = "id";
588 const USER_FIELD_DATA: &'static str = "payload";
589}
590
591impl<T: FromRedisValue> FromRedisValue for EntryValue<T> {
592 fn from_redis_value(value: &Value) -> RedisResult<Self> {
593 match value {
594 Value::Bulk(values) => {
596 let mut id = None;
597 let mut payload = None;
598
599 for pair in values.chunks(2) {
600 let key = parse_redis_key(&pair[0])?;
601 let value = &pair[1];
602
603 if id.is_none() && key.eq_ignore_ascii_case(Self::USER_FIELD_ID) {
604 id = Some(value);
605 }
606 assign_field_if!(payload = value IF key == Self::USER_FIELD_DATA);
607 }
608
609 let id = match id {
610 Some(id) => match id {
611 Value::Data(data) => {
612 let data: [u8; 16] = match data.as_slice().try_into() {
613 Ok(data) => data,
614 Err(_) => return Err((redis::ErrorKind::TypeError, "id field is not 16 bytes").into()),
615 };
616 let data = u128::from_le_bytes(data);
617 uuid::Uuid::from_u128(data)
618 }
619 _ => return Err((redis::ErrorKind::TypeError, "id field is not bytes").into()),
620 },
621 None => return Err((redis::ErrorKind::TypeError, "Missing id field").into()),
622 };
623 let payload = unwrap_required_field!(payload);
624 Ok(Self { id, payload })
625 }
626 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream values").into()),
627 Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
628 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
629 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
630 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
631 }
632 }
633}
634
635impl<T: ToRedisArgs> ToRedisArgs for EntryValue<T> {
636 #[inline(always)]
637 fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
638 Self::USER_FIELD_ID.write_redis_args(out);
639 out.write_arg(&self.id.as_u128().to_le_bytes());
640
641 Self::USER_FIELD_DATA.write_redis_args(out);
642 self.payload.write_redis_args(out);
643 }
644
645 #[inline(always)]
646 fn is_single_arg(&self) -> bool {
648 false
649 }
650}
651
652#[derive(Copy, Clone)]
653pub enum RangeIdx {
655 Any,
657 Timestamp(TimestampId),
659 Id(StreamId),
661 ExcludeId(StreamId),
665}
666
667impl fmt::Debug for RangeIdx {
668 #[inline(always)]
669 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
670 match self {
671 Self::Any => fmt.write_str("Any"),
672 Self::Timestamp(time) => fmt::Debug::fmt(time, fmt),
673 Self::Id(id) => fmt::Debug::fmt(id, fmt),
674 Self::ExcludeId(id) => {
675 fmt.write_str("Exclude(")?;
676 fmt::Debug::fmt(id, fmt)?;
677 fmt.write_str(")")
678 }
679 }
680 }
681}
682
683#[derive(Debug, Copy, Clone)]
684pub struct Range {
686 pub start: RangeIdx,
688 pub end: RangeIdx,
690}
691
692impl ToRedisArgs for Range {
693 #[inline(always)]
694 fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
695 let Self { start, end } = self;
696
697 match start {
698 RangeIdx::Any => "-".write_redis_args(out),
699 RangeIdx::Timestamp(id) => id.write_redis_args(out),
700 RangeIdx::Id(id) => id.write_redis_args(out),
701 RangeIdx::ExcludeId(id) => id.next().write_redis_args(out),
702 }
703
704 match end {
705 RangeIdx::Any => "+".write_redis_args(out),
706 RangeIdx::Timestamp(id) => id.write_redis_args(out),
707 RangeIdx::Id(id) => id.write_redis_args(out),
708 RangeIdx::ExcludeId(id) => id.prev().write_redis_args(out),
709 }
710 }
711
712 #[inline(always)]
713 fn is_single_arg(&self) -> bool {
714 false
715 }
716}
717
718pub struct PendingParams<'a> {
720 pub group: &'a str,
730 pub range: Range,
732 pub idle: Option<time::Duration>,
736 pub consumer: Option<&'a str>,
738 pub count: usize,
740}
741
742pub(crate) struct PendingParamsConfig<'a> {
743 pub config: &'a QueueConfig,
744 pub params: &'a PendingParams<'a>,
745}
746
747impl<'a> ToRedisArgs for PendingParamsConfig<'a> {
748 #[inline(always)]
749 fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
750 let Self { config, params } = self;
751
752 config.stream.as_ref().write_redis_args(out);
753 params.group.write_redis_args(out);
754
755 if let Some(idle) = params.idle {
756 let idle: u64 = match idle.as_millis().try_into() {
757 Ok(idle) => idle,
758 Err(_) => u64::max_value(),
759 };
760
761 idents::IDLE.as_bytes().write_redis_args(out);
762 idle.write_redis_args(out);
763 }
764
765 params.range.write_redis_args(out);
766 params.count.write_redis_args(out);
767
768 if let Some(consumer) = ¶ms.consumer {
769 consumer.write_redis_args(out)
770 }
771 }
772
773 #[inline(always)]
774 fn is_single_arg(&self) -> bool {
775 false
776 }
777}
778
779#[derive(Debug)]
780pub struct PendingEntry {
782 pub id: StreamId,
784 pub consumer: String,
786 pub last_delivered: time::Duration,
788 pub count: u64,
790}
791
792impl FromRedisValue for PendingEntry {
793 fn from_redis_value(value: &Value) -> RedisResult<Self> {
794 match value {
795 Value::Bulk(values) => {
797 if values.len() == 4 {
798 Ok(Self {
799 id: StreamId::from_redis_value(&values[0])?,
800 consumer: String::from_redis_value(&values[1])?,
801 last_delivered: time::Duration::from_millis(u64::from_redis_value(&values[2])?),
802 count: u64::from_redis_value(&values[3])?,
803 })
804 } else {
805 Err((
806 redis::ErrorKind::TypeError,
807 "Invalid number of values in PendingEntry, should be 4",
808 )
809 .into())
810 }
811 }
812 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a pending field").into()),
813 Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
814 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
815 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
816 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
817 }
818 }
819
820 #[inline]
821 fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
822 None
823 }
824}
825
826pub enum FetchType {
828 New,
832 Pending,
836 After(StreamId),
838}
839
840impl ToRedisArgs for FetchType {
841 #[inline(always)]
842 fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
843 match self {
844 Self::New => out.write_arg(b">"),
845 Self::Pending => out.write_arg(b"0"),
846 Self::After(id) => id.write_redis_args(out),
847 }
848 }
849
850 #[inline(always)]
851 fn is_single_arg(&self) -> bool {
852 true
853 }
854}
855
856pub struct FetchParams<'a> {
858 pub group: &'a str,
868 pub consumer: &'a str,
876 pub typ: FetchType,
878 pub count: usize,
882 pub timeout: Option<time::Duration>,
884}
885
886pub(crate) struct FetchParamsConfig<'a> {
887 pub config: &'a QueueConfig,
888 pub params: &'a FetchParams<'a>,
889}
890
891impl<'a> ToRedisArgs for FetchParamsConfig<'a> {
892 #[inline(always)]
893 fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
894 let Self { config, params } = self;
895
896 out.write_arg(idents::GROUP.as_bytes());
897 params.group.write_redis_args(out);
898 params.consumer.write_redis_args(out);
899 if params.count > 0 {
900 out.write_arg(idents::COUNT.as_bytes());
901 params.count.write_redis_args(out);
902 }
903 if let Some(timeout) = params.timeout {
904 out.write_arg(idents::BLOCK.as_bytes());
905 (timeout.as_millis() as u64).write_redis_args(out);
906 }
907
908 out.write_arg(idents::STREAMS.as_bytes());
909 config.stream.as_ref().write_redis_args(out);
910
911 params.typ.write_redis_args(out);
912 }
913
914 #[inline(always)]
915 fn is_single_arg(&self) -> bool {
916 false
917 }
918}
919
920#[derive(Debug, PartialEq, Eq, Clone)]
921pub struct Entry<T> {
923 pub id: StreamId,
925 pub value: EntryValue<T>,
927}
928
929impl<T: FromRedisValue> FromRedisValue for Entry<T> {
930 fn from_redis_value(value: &Value) -> RedisResult<Self> {
931 match value {
932 Value::Bulk(values) => {
934 if values.len() == 2 {
938 Ok(Self {
939 id: StreamId::from_redis_value(&values[0])?,
940 value: EntryValue::<T>::from_redis_value(&values[1])?,
941 })
942 } else {
943 Err((
944 redis::ErrorKind::TypeError,
945 "Invalid number of values in entry, should be 2",
946 )
947 .into())
948 }
949 }
950 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
951 Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
952 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
953 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
954 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
955 }
956 }
957
958 fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
959 None
960 }
961}
962
963pub struct FetchResult<T> {
965 pub stream: String,
967 pub entries: Vec<Entry<T>>,
969}
970
971impl<T: FromRedisValue> FromRedisValue for FetchResult<T> {
972 fn from_redis_value(value: &Value) -> RedisResult<Self> {
973 match value {
974 Value::Bulk(values) => {
976 if values.len() == 2 {
980 Ok(Self {
981 stream: String::from_redis_value(&values[0])?,
982 entries: Vec::<Entry<T>>::from_redis_value(&values[1])?,
983 })
984 } else {
985 Err((redis::ErrorKind::TypeError, "Invalid number of values in entry, should be 2").into())
986 }
987 }
988 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
989 Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
990 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
991 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
992 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
993 }
994 }
995
996 fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
997 None
998 }
999}
1000
1001pub struct FetchEntries<T> {
1003 pub entries: Vec<Entry<T>>,
1005}
1006
1007impl<T: FromRedisValue> FromRedisValue for FetchEntries<T> {
1008 fn from_redis_value(value: &Value) -> RedisResult<Self> {
1009 match value {
1010 Value::Bulk(values) => {
1012 if values.len() == 2 {
1016 Ok(Self {
1017 entries: Vec::<Entry<T>>::from_redis_value(&values[1])?,
1018 })
1019 } else {
1020 Err((redis::ErrorKind::TypeError, "Invalid number of values in entry, should be 2").into())
1021 }
1022 }
1023 Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
1024 Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
1025 Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
1026 Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
1027 Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
1028 }
1029 }
1030
1031 fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
1032 None
1033 }
1034}