1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
//! Pseudo iterators types to query Queue
use core::time;
use crate::types::{RangeIdx, TimestampId, FetchType, FetchParams, FetchResult, PendingParams, PendingEntry, StreamId, Entry};
use crate::queue::Queue;
use redis::{RedisError, FromRedisValue};
///Iterator to fetch messages from within queue.
///
///If fetch parameters are to query new messages, it will only return freshly added messages.
///Once these messages are fetched, they are considered pending and no longer returned by this
///iterator
///
///If fetch parameter specifies to return pending messages within queue, then iterator will resume
///after last message id.
pub struct FetchIter<'a> {
params: FetchParams<'a>,
queue: Queue,
}
impl<'a> FetchIter<'a> {
#[inline(always)]
///Creates new iterator
pub fn new(params: FetchParams<'a>, queue: Queue) -> Self {
Self {
params,
queue
}
}
#[inline(always)]
///Sets number of items to fetch
pub fn set_count(&mut self, count: usize) {
self.params.count = count;
}
#[inline(always)]
///Sets cursor to position where to iterate from.
pub fn set_cursor(&mut self, typ: FetchType) {
self.params.typ = typ;
}
#[inline(always)]
///Sets time waiting to fetch data.
///
///Redis will return reply within this time, not necessary immediately even if data is available.
pub fn set_timeout(&mut self, timeout: time::Duration) {
self.params.timeout = Some(timeout);
}
///Performs fetch, returning messages depending on `FetchParams::typ`.
///
///If pending messages are fetched, moves cursor to last pending message after fetched messages.
///
///We fetch our task as raw bytes to ensure we can always get it, regardless how it was
///serialized.
pub async fn next<T: FromRedisValue>(&mut self) -> Result<FetchResult<T>, RedisError> {
let result = self.queue.fetch(&self.params).await?;
match self.params.typ {
FetchType::New => (),
FetchType::Pending | FetchType::After(_) => {
if let Some(entry) = result.entries.last() {
self.params.typ = FetchType::After(entry.id);
}
}
}
Ok(result)
}
///Performs fetch, returning messages depending on `FetchParams::typ`.
///
///If pending messages are fetched, moves cursor to last pending message after fetched messages.
///
///We fetch our task as raw bytes to ensure we can always get it, regardless how it was
///serialized.
///
///Differently from `next` it returns only list of entries
pub async fn next_entries<T: FromRedisValue>(&mut self) -> Result<Vec<Entry<T>>, RedisError> {
let result = self.queue.fetch_entries(&self.params).await?;
match self.params.typ {
FetchType::New => (),
FetchType::Pending | FetchType::After(_) => {
if let Some(entry) = result.entries.last() {
self.params.typ = FetchType::After(entry.id);
}
}
}
Ok(result.entries)
}
}
///Iterator over expired pending messages.
pub struct PendingIter<'a> {
params: PendingParams<'a>,
queue: Queue,
}
impl<'a> PendingIter<'a> {
#[inline(always)]
///Creates new iterator
pub fn new(params: PendingParams<'a>, queue: Queue) -> Self {
Self {
params,
queue
}
}
#[inline(always)]
///Sets cursor position to start after specified `id`
pub fn set_cursor(&mut self, id: StreamId) {
self.params.range.start = RangeIdx::ExcludeId(id);
}
#[inline(always)]
///Sets IDLE timeout, essentially filtering out messages not older than specified `duration`
pub fn set_idle(&mut self, duration: time::Duration) {
self.params.idle = Some(duration);
}
///Sets end range to task scheduled for execution right now.
pub async fn set_range_until_now(&mut self) -> Result<(), RedisError> {
let timestamp = self.queue.time().await?;
self.params.range.end = RangeIdx::Timestamp(TimestampId::new(timestamp));
Ok(())
}
///Attempts to peek at pending messages.
pub async fn next(&mut self) -> Result<Vec<PendingEntry>, RedisError> {
let result = self.queue.pending(&self.params).await?;
if let Some(entry) = result.last() {
self.params.range.start = RangeIdx::ExcludeId(entry.id);
}
Ok(result)
}
}