use core::time;
use crate::types::{RangeIdx, TimestampId, FetchType, FetchParams, FetchResult, PendingParams, PendingEntry, StreamId, Entry};
use crate::queue::Queue;
use redis::{RedisError, FromRedisValue};
pub struct FetchIter<'a> {
params: FetchParams<'a>,
queue: Queue,
}
impl<'a> FetchIter<'a> {
#[inline(always)]
pub fn new(params: FetchParams<'a>, queue: Queue) -> Self {
Self {
params,
queue
}
}
#[inline(always)]
pub fn set_count(&mut self, count: usize) {
self.params.count = count;
}
#[inline(always)]
pub fn set_cursor(&mut self, typ: FetchType) {
self.params.typ = typ;
}
#[inline(always)]
pub fn set_timeout(&mut self, timeout: time::Duration) {
self.params.timeout = Some(timeout);
}
pub async fn next<T: FromRedisValue>(&mut self) -> Result<FetchResult<T>, RedisError> {
let result = self.queue.fetch(&self.params).await?;
match self.params.typ {
FetchType::New => (),
FetchType::Pending | FetchType::After(_) => {
if let Some(entry) = result.entries.last() {
self.params.typ = FetchType::After(entry.id);
}
}
}
Ok(result)
}
pub async fn next_entries<T: FromRedisValue>(&mut self) -> Result<Vec<Entry<T>>, RedisError> {
let result = self.queue.fetch_entries(&self.params).await?;
match self.params.typ {
FetchType::New => (),
FetchType::Pending | FetchType::After(_) => {
if let Some(entry) = result.entries.last() {
self.params.typ = FetchType::After(entry.id);
}
}
}
Ok(result.entries)
}
}
pub struct PendingIter<'a> {
params: PendingParams<'a>,
queue: Queue,
}
impl<'a> PendingIter<'a> {
#[inline(always)]
pub fn new(params: PendingParams<'a>, queue: Queue) -> Self {
Self {
params,
queue
}
}
#[inline(always)]
pub fn set_cursor(&mut self, id: StreamId) {
self.params.range.start = RangeIdx::ExcludeId(id);
}
#[inline(always)]
pub fn set_idle(&mut self, duration: time::Duration) {
self.params.idle = Some(duration);
}
pub async fn set_range_until_now(&mut self) -> Result<(), RedisError> {
let timestamp = self.queue.time().await?;
self.params.range.end = RangeIdx::Timestamp(TimestampId::new(timestamp));
Ok(())
}
pub async fn next(&mut self) -> Result<Vec<PendingEntry>, RedisError> {
let result = self.queue.pending(&self.params).await?;
if let Some(entry) = result.last() {
self.params.range.start = RangeIdx::ExcludeId(entry.id);
}
Ok(result)
}
}