use core::{fmt, time, cmp};
use core::future::Future;
use std::borrow::Cow;
use redis::{RedisError, FromRedisValue};
use crate::Queue;
use crate::iters::{PendingIter, FetchIter};
use crate::types::{StreamId, Range, RangeIdx, FetchParams, PendingParams, FetchType, Entry};
pub mod dispatch;
mod run;
pub use run::{RunParams, manage};
mod utils;
pub use utils::*;
#[derive(Debug)]
pub enum ConfigError {
QueueGroupNameMissing,
QueueGroupNameInvalid,
QueueManagerNameMissing,
QueueManagerNameInvalid,
QueueManagerPollTimeInvalid,
QueueManagerMaxPendingTimeInvalid,
Redis(RedisError),
}
impl fmt::Display for ConfigError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::QueueGroupNameMissing => fmt.write_str("Queue group name is empty"),
Self::QueueGroupNameInvalid => fmt.write_str("Queue group name is not valid ASCII string."),
Self::QueueManagerNameMissing => fmt.write_str("Queue manager name is empty."),
Self::QueueManagerNameInvalid => fmt.write_str("Queue manager name is not valid ASCII string."),
Self::QueueManagerPollTimeInvalid => fmt.write_str("Queue manager poll time is not valid positive integer."),
Self::QueueManagerMaxPendingTimeInvalid => fmt.write_str("Queue manager max pending time is not valid positive integer."),
Self::Redis(error) => fmt.write_fmt(format_args!("Redis error: {error}")),
}
}
}
impl From<RedisError> for ConfigError {
#[inline(always)]
fn from(value: RedisError) -> Self {
Self::Redis(value)
}
}
impl std::error::Error for ConfigError {
#[inline]
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Redis(error) => Some(error),
_ => None,
}
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum ConsumerKind {
Single,
Main,
Extra,
}
impl ConsumerKind {
#[inline]
pub fn determine(name: &str) -> Self {
let mut split = name.rsplitn(2, '-');
match split.next().and_then(|idx| idx.parse::<u32>().ok()) {
Some(0) => Self::Main,
Some(_) => Self::Extra,
None => Self::Single,
}
}
}
#[derive(Clone)]
pub struct ManagerConfig {
pub group: Cow<'static, str>,
pub consumer: Cow<'static, str>,
pub kind: ConsumerKind,
pub poll_time: time::Duration,
pub max_pending_time: time::Duration,
}
#[derive(Clone)]
pub struct Manager {
queue: Queue,
config: ManagerConfig,
}
impl Manager {
pub async fn new(queue: Queue, config: ManagerConfig) -> Result<Self, ConfigError> {
if config.poll_time.is_zero() {
Err(ConfigError::QueueManagerPollTimeInvalid)
} else if config.max_pending_time.is_zero() {
Err(ConfigError::QueueManagerMaxPendingTimeInvalid)
} else if config.group.as_ref().is_empty() {
Err(ConfigError::QueueGroupNameMissing)
} else if !config.group.as_ref().is_ascii() {
Err(ConfigError::QueueGroupNameInvalid)
} else if config.consumer.as_ref().is_empty() {
Err(ConfigError::QueueManagerNameMissing)
} else if !config.consumer.as_ref().is_ascii() {
Err(ConfigError::QueueManagerNameInvalid)
} else {
queue.create_group(&config.group).await?;
Ok(Self {
queue,
config
})
}
}
#[inline(always)]
pub fn queue(&self) -> &Queue {
&self.queue
}
#[inline(always)]
pub fn config(&self) -> &ManagerConfig {
&self.config
}
pub fn max_pending_retry_count(&self) -> u64 {
if self.config.max_pending_time > self.config.poll_time {
let result =
self.config.max_pending_time.as_secs_f64() / self.config.poll_time.as_secs_f64();
cmp::min(result.round() as u64, 1)
} else {
1
}
}
pub fn pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
let range = Range {
start: match last_id {
Some(last_id) => RangeIdx::ExcludeId(last_id),
None => RangeIdx::Any,
},
end: RangeIdx::Any,
};
let params = PendingParams {
group: self.config.group.as_ref(),
consumer: Some(self.config.consumer.as_ref()),
range,
idle: None,
count,
};
PendingIter::new(params, self.queue.clone())
}
pub fn expired_pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
let range = Range {
start: match last_id {
Some(last_id) => RangeIdx::ExcludeId(last_id),
None => RangeIdx::Any,
},
end: RangeIdx::Any,
};
let params = PendingParams {
group: self.config.group.as_ref(),
consumer: Some(self.config.consumer.as_ref()),
range,
idle: Some(self.config.max_pending_time),
count,
};
PendingIter::new(params, self.queue.clone())
}
pub fn fetch_new_tasks(&self, count: usize) -> FetchIter {
let params = FetchParams {
group: self.config.group.as_ref(),
consumer: self.config.consumer.as_ref(),
typ: FetchType::New,
count,
timeout: Some(self.config.poll_time),
};
FetchIter::new(params, self.queue.clone())
}
pub async fn get_pending_by_id<T: FromRedisValue>(&self, id: StreamId) -> Result<Option<Entry<T>>, RedisError> {
let mut iter = self.fetch_new_tasks(1);
iter.set_cursor(FetchType::After(id.prev()));
let mut result = iter.next_entries().await?;
if let Some(item) = result.pop() {
if item.id != id {
Ok(None)
} else {
Ok(Some(item))
}
} else {
Ok(None)
}
}
#[inline]
pub async fn consume_tasks(&self, tasks: &[StreamId]) -> Result<usize, RedisError> {
self.queue.consume(&self.config.group, tasks).await
}
#[inline(always)]
fn trim_queue(&self, retry_num: u32) -> impl Future<Output = ()> + Send + 'static {
tokio::task::unconstrained(trim_queue_task(
self.queue().clone(),
self.config().kind,
retry_num,
))
}
}