redis_queue/
iters.rs

1//! Pseudo iterators types to query Queue
2
3use core::time;
4
5use crate::types::{RangeIdx, TimestampId, FetchType, FetchParams, FetchResult, PendingParams, PendingEntry, StreamId, Entry};
6use crate::queue::Queue;
7
8use redis::{RedisError, FromRedisValue};
9
10///Iterator to fetch messages from within queue.
11///
12///If fetch parameters are to query new messages, it will only return freshly added messages.
13///Once these messages are fetched, they are considered pending and no longer returned by this
14///iterator
15///
16///If fetch parameter specifies to return pending messages within queue, then iterator will resume
17///after last message id.
18pub struct FetchIter<'a> {
19    params: FetchParams<'a>,
20    queue: Queue,
21}
22
23impl<'a> FetchIter<'a> {
24    #[inline(always)]
25    ///Creates new iterator
26    pub fn new(params: FetchParams<'a>, queue: Queue) -> Self {
27        Self {
28            params,
29            queue
30        }
31    }
32
33    #[inline(always)]
34    ///Sets number of items to fetch
35    pub fn set_count(&mut self, count: usize) {
36        self.params.count = count;
37    }
38
39    #[inline(always)]
40    ///Sets cursor to position where to iterate from.
41    pub fn set_cursor(&mut self, typ: FetchType) {
42        self.params.typ = typ;
43    }
44
45    #[inline(always)]
46    ///Sets time waiting to fetch data.
47    ///
48    ///Redis will return reply within this time, not necessary immediately even if data is available.
49    pub fn set_timeout(&mut self, timeout: time::Duration) {
50        self.params.timeout = Some(timeout);
51    }
52
53    ///Performs fetch, returning messages depending on `FetchParams::typ`.
54    ///
55    ///If pending messages are fetched, moves cursor to last pending message after fetched messages.
56    ///
57    ///We fetch our task as raw bytes to ensure we can always get it, regardless how it was
58    ///serialized.
59    pub async fn next<T: FromRedisValue>(&mut self) -> Result<FetchResult<T>, RedisError> {
60        let result = self.queue.fetch(&self.params).await?;
61        match self.params.typ {
62            FetchType::New => (),
63            FetchType::Pending | FetchType::After(_) => {
64                if let Some(entry) = result.entries.last() {
65                    self.params.typ = FetchType::After(entry.id);
66                }
67            }
68        }
69        Ok(result)
70    }
71
72    ///Performs fetch, returning messages depending on `FetchParams::typ`.
73    ///
74    ///If pending messages are fetched, moves cursor to last pending message after fetched messages.
75    ///
76    ///We fetch our task as raw bytes to ensure we can always get it, regardless how it was
77    ///serialized.
78    ///
79    ///Differently from `next` it returns only list of entries
80    pub async fn next_entries<T: FromRedisValue>(&mut self) -> Result<Vec<Entry<T>>, RedisError> {
81        let result = self.queue.fetch_entries(&self.params).await?;
82        match self.params.typ {
83            FetchType::New => (),
84            FetchType::Pending | FetchType::After(_) => {
85                if let Some(entry) = result.entries.last() {
86                    self.params.typ = FetchType::After(entry.id);
87                }
88            }
89        }
90
91        Ok(result.entries)
92    }
93}
94
95///Iterator over expired pending messages.
96pub struct PendingIter<'a> {
97    params: PendingParams<'a>,
98    queue: Queue,
99}
100
101impl<'a> PendingIter<'a> {
102    #[inline(always)]
103    ///Creates new iterator
104    pub fn new(params: PendingParams<'a>, queue: Queue) -> Self {
105        Self {
106            params,
107            queue
108        }
109    }
110
111    #[inline(always)]
112    ///Sets cursor position to start after specified `id`
113    pub fn set_cursor(&mut self, id: StreamId) {
114        self.params.range.start = RangeIdx::ExcludeId(id);
115    }
116
117    #[inline(always)]
118    ///Sets IDLE timeout, essentially filtering out messages not older than specified `duration`
119    pub fn set_idle(&mut self, duration: time::Duration) {
120        self.params.idle = Some(duration);
121    }
122
123    ///Sets end range to task scheduled for execution right now.
124    pub async fn set_range_until_now(&mut self) -> Result<(), RedisError> {
125        let timestamp = self.queue.time().await?;
126        self.params.range.end = RangeIdx::Timestamp(TimestampId::new(timestamp));
127        Ok(())
128    }
129
130    ///Attempts to peek at pending messages.
131    pub async fn next(&mut self) -> Result<Vec<PendingEntry>, RedisError> {
132        let result = self.queue.pending(&self.params).await?;
133
134        if let Some(entry) = result.last() {
135            self.params.range.start = RangeIdx::ExcludeId(entry.id);
136        }
137
138        Ok(result)
139    }
140}