1use core::time;
4
5use crate::types::{RangeIdx, TimestampId, FetchType, FetchParams, FetchResult, PendingParams, PendingEntry, StreamId, Entry};
6use crate::queue::Queue;
7
8use redis::{RedisError, FromRedisValue};
9
10pub struct FetchIter<'a> {
19 params: FetchParams<'a>,
20 queue: Queue,
21}
22
23impl<'a> FetchIter<'a> {
24 #[inline(always)]
25 pub fn new(params: FetchParams<'a>, queue: Queue) -> Self {
27 Self {
28 params,
29 queue
30 }
31 }
32
33 #[inline(always)]
34 pub fn set_count(&mut self, count: usize) {
36 self.params.count = count;
37 }
38
39 #[inline(always)]
40 pub fn set_cursor(&mut self, typ: FetchType) {
42 self.params.typ = typ;
43 }
44
45 #[inline(always)]
46 pub fn set_timeout(&mut self, timeout: time::Duration) {
50 self.params.timeout = Some(timeout);
51 }
52
53 pub async fn next<T: FromRedisValue>(&mut self) -> Result<FetchResult<T>, RedisError> {
60 let result = self.queue.fetch(&self.params).await?;
61 match self.params.typ {
62 FetchType::New => (),
63 FetchType::Pending | FetchType::After(_) => {
64 if let Some(entry) = result.entries.last() {
65 self.params.typ = FetchType::After(entry.id);
66 }
67 }
68 }
69 Ok(result)
70 }
71
72 pub async fn next_entries<T: FromRedisValue>(&mut self) -> Result<Vec<Entry<T>>, RedisError> {
81 let result = self.queue.fetch_entries(&self.params).await?;
82 match self.params.typ {
83 FetchType::New => (),
84 FetchType::Pending | FetchType::After(_) => {
85 if let Some(entry) = result.entries.last() {
86 self.params.typ = FetchType::After(entry.id);
87 }
88 }
89 }
90
91 Ok(result.entries)
92 }
93}
94
95pub struct PendingIter<'a> {
97 params: PendingParams<'a>,
98 queue: Queue,
99}
100
101impl<'a> PendingIter<'a> {
102 #[inline(always)]
103 pub fn new(params: PendingParams<'a>, queue: Queue) -> Self {
105 Self {
106 params,
107 queue
108 }
109 }
110
111 #[inline(always)]
112 pub fn set_cursor(&mut self, id: StreamId) {
114 self.params.range.start = RangeIdx::ExcludeId(id);
115 }
116
117 #[inline(always)]
118 pub fn set_idle(&mut self, duration: time::Duration) {
120 self.params.idle = Some(duration);
121 }
122
123 pub async fn set_range_until_now(&mut self) -> Result<(), RedisError> {
125 let timestamp = self.queue.time().await?;
126 self.params.range.end = RangeIdx::Timestamp(TimestampId::new(timestamp));
127 Ok(())
128 }
129
130 pub async fn next(&mut self) -> Result<Vec<PendingEntry>, RedisError> {
132 let result = self.queue.pending(&self.params).await?;
133
134 if let Some(entry) = result.last() {
135 self.params.range.start = RangeIdx::ExcludeId(entry.id);
136 }
137
138 Ok(result)
139 }
140}