use std::sync::Arc;
use crate::{
error::QueueError, BackOffJobOptions, FailedDetails, JobMetrics, JobState, JobToken,
RemoveOnCompletionOrFailure, Repeat, Trace,
};
#[cfg(feature = "redis-store")]
use redis::{FromRedisValue, ParsingError, ToRedisArgs, ToSingleRedisArg, Value};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
pub enum ProcessedResult<R> {
Failed(FailedDetails),
#[debug("{_1:?}")]
Success(R, JobMetrics),
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
pub enum JobField<R> {
Token(JobToken),
Payload(ProcessedResult<R>),
ProcessedOn(u64),
FinishedOn(u64),
State(JobState),
BackTrace(Trace),
}
impl<R> JobField<R> {
pub const fn name(&self) -> &'static str {
match self {
Self::Token(_) => "token",
Self::Payload(processed_result) => {
if let ProcessedResult::Success(_, _) = processed_result {
"returnedValue"
} else {
"failedReason"
}
}
Self::ProcessedOn(_) => "processedOn",
Self::FinishedOn(_) => "finishedOn",
Self::State(_) => "state",
Self::BackTrace(_) => "stackTrace",
}
}
}
use derive_more::{Debug, Display};
#[derive(Display, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
pub enum CollectionSuffix {
Active,
Completed,
Delayed,
Stalled,
Prioritized,
PriorityCounter,
Id,
Meta,
Events,
Wait,
Paused,
Failed,
Marker,
#[display("{_0}")]
Job(u64),
#[display("")]
Prefix,
#[display("{_0}:lock")]
Lock(u64),
#[display("stalled_check")]
StalledCheck,
#[display("worker_metrics")]
WorkerMetrics,
}
impl CollectionSuffix {
#[must_use]
pub fn to_collection_name(&self, prefix: &str, name: &str) -> String {
format!("{}:{}:{}", prefix, name, &self).to_lowercase()
}
const fn discriminant(&self) -> u8 {
match self {
Self::Active => 1,
Self::Completed => 2,
Self::Delayed => 3,
Self::Stalled => 4,
Self::Prioritized => 5,
Self::PriorityCounter => 6,
Self::Id => 7,
Self::Meta => 8,
Self::Events => 9,
Self::Wait => 10,
Self::Paused => 11,
Self::Failed => 12,
Self::Marker => 13,
Self::Job(_) => 14,
Self::Prefix => 15,
Self::Lock(_) => 16,
Self::StalledCheck => 17,
Self::WorkerMetrics => 18,
}
}
#[must_use]
pub fn tag(&self) -> u64 {
let top = u64::from(self.discriminant()) << 56; match self {
Self::Active
| Self::Completed
| Self::Delayed
| Self::Stalled
| Self::Prioritized
| Self::PriorityCounter
| Self::Id
| Self::Meta
| Self::Events
| Self::Wait
| Self::Paused
| Self::Failed
| Self::Marker
| Self::Prefix
| Self::StalledCheck
| Self::WorkerMetrics => top,
Self::Job(id) | Self::Lock(id) => top | (id & 0x00FF_FFFF_FFFF_FFFF),
}
}
#[must_use]
pub fn to_bytes(&self) -> [u8; 8] {
self.tag().to_be_bytes()
}
#[must_use]
pub const fn from_tag(tag: u64) -> Option<Self> {
let disc = (tag >> 56) as u8;
let payload = tag & 0x00FF_FFFF_FFFF_FFFF;
Some(match disc {
1 => Self::Active,
2 => Self::Completed,
3 => Self::Delayed,
4 => Self::Stalled,
5 => Self::Prioritized,
6 => Self::PriorityCounter,
7 => Self::Id,
8 => Self::Meta,
9 => Self::Events,
10 => Self::Wait,
11 => Self::Paused,
12 => Self::Failed,
13 => Self::Marker,
14 => Self::Job(payload),
15 => Self::Prefix,
16 => Self::Lock(payload),
17 => Self::StalledCheck,
_ => return None,
})
}
}
impl From<JobState> for CollectionSuffix {
fn from(val: JobState) -> Self {
match val {
JobState::Wait => Self::Wait,
JobState::Stalled | JobState::Paused => Self::Paused,
JobState::Active | JobState::Resumed => Self::Active,
JobState::Completed => Self::Completed,
JobState::Failed => Self::Failed,
JobState::Delayed => Self::Delayed,
JobState::Progress => Self::Prefix,
JobState::Prioritized => Self::Prioritized,
JobState::Processing => Self::Meta,
JobState::Obliterated => Self::Events,
}
}
}
#[cfg(feature = "redis-store")]
use redis::RedisWrite;
#[cfg(feature = "redis-store")]
impl ToRedisArgs for CollectionSuffix {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg_fmt(self.to_string().to_lowercase());
}
}
#[cfg(feature = "redis-store")]
impl ToSingleRedisArg for CollectionSuffix {}
#[cfg(feature = "redis-store")]
impl ToSingleRedisArg for QueueEventMode {}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, Eq, PartialEq)]
#[repr(u8)]
pub enum QueueEventMode {
PubSub = 1,
#[default]
Stream = 0,
}
impl TryFrom<u8> for QueueEventMode {
type Error = QueueError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
1 => Ok(Self::PubSub),
0 => Ok(Self::Stream),
_ => Err(QueueError::UnKnownEventMode),
}
}
}
#[cfg(feature = "redis-store")]
impl FromRedisValue for QueueEventMode {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let value = if matches!(v, Value::Nil) {
0
} else {
u8::from_redis_value(v)?
};
let mode = value.try_into().unwrap_or_default();
Ok(mode)
}
}
#[cfg(feature = "redis-store")]
impl ToRedisArgs for QueueEventMode {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + redis::RedisWrite,
{
let value = *self as u8;
out.write_arg_fmt(value);
}
}
#[derive(Clone, Debug)]
pub enum RetryOptions<'a> {
Failed(&'a BackOffJobOptions),
WithRepeat(&'a Repeat),
}
impl<'a> From<&'a BackOffJobOptions> for RetryOptions<'a> {
fn from(value: &'a BackOffJobOptions) -> Self {
RetryOptions::Failed(value)
}
}
impl<'a> From<&'a Repeat> for RetryOptions<'a> {
fn from(value: &'a Repeat) -> Self {
Self::WithRepeat(value)
}
}
#[derive(Debug, Clone)]
pub struct QueueOpts {
pub remove_on_fail: Option<RemoveOnCompletionOrFailure>,
pub remove_on_complete: Option<RemoveOnCompletionOrFailure>,
pub attempts: u64,
pub default_backoff: Option<BackOffJobOptions>,
pub event_mode: Option<QueueEventMode>,
pub repeat: Option<Repeat>,
}
impl Default for QueueOpts {
fn default() -> Self {
Self {
event_mode: Some(QueueEventMode::default()),
remove_on_fail: Option::default(),
remove_on_complete: Option::default(),
repeat: None,
attempts: 1,
default_backoff: None,
}
}
}
use crossbeam::atomic::AtomicCell;
pub type Counter = Arc<AtomicCell<u64>>;
fn create_counter(count: u64) -> Counter {
Counter::new(count.into())
}
#[derive(Debug, Clone, Default)]
pub struct QueueMetrics {
pub last_id: Counter,
pub processing: Counter,
pub prioritized: Counter,
pub active: Counter,
pub stalled: Counter,
pub delayed: Counter,
pub completed: Counter,
pub failed: Counter,
pub paused: Counter,
pub waiting: Counter,
pub is_paused: Arc<AtomicCell<bool>>,
pub event_mode: Arc<AtomicCell<QueueEventMode>>,
}
impl QueueMetrics {
#[must_use]
pub fn all_jobs_completed(&self) -> bool {
let last_id = self.last_id.load();
last_id > 0 && self.completed.load() == last_id && self.active.load() == 0 && self.is_idle()
}
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
last_id: u64,
processing: u64,
active: u64,
stalled: u64,
completed: u64,
delayed: u64,
prioritized: u64,
paused: u64,
failed: u64,
waiting: u64,
is_paused: bool,
event_mode: QueueEventMode,
) -> Self {
Self {
last_id: create_counter(last_id),
prioritized: create_counter(prioritized),
processing: create_counter(processing),
active: create_counter(active),
stalled: create_counter(stalled),
completed: create_counter(completed),
waiting: create_counter(waiting),
delayed: create_counter(delayed),
paused: create_counter(paused),
failed: create_counter(failed),
is_paused: Arc::new(is_paused.into()),
event_mode: Arc::new(AtomicCell::new(event_mode)),
}
}
pub fn update(&self, other: &Self) {
self.paused.swap(other.paused.load());
self.completed.swap(other.completed.load());
self.stalled.swap(other.stalled.load());
self.active.swap(other.active.load());
self.last_id.swap(other.last_id.load());
self.delayed.swap(other.delayed.load());
self.failed.swap(other.failed.load());
self.waiting.swap(other.waiting.load());
self.processing.swap(other.processing.load());
self.prioritized.swap(other.prioritized.load());
self.event_mode.swap(other.event_mode.load());
}
#[must_use]
pub fn has_delayed(&self) -> bool {
self.delayed.load() > 0
}
#[must_use]
pub fn queue_has_work(&self) -> bool {
self.waiting.load() > 0
|| self.delayed.load() > 0
|| self.stalled.load() > 0
|| self.prioritized.load() > 0
}
#[must_use]
pub fn queue_is_paused(&self) -> bool {
self.is_paused.load()
}
#[must_use]
pub fn workers_idle(&self) -> bool {
self.processing.load() == 0
}
#[must_use]
pub fn has_active_jobs(&self) -> bool {
self.active.load() > 0
}
#[must_use]
pub fn is_idle(&self) -> bool {
!self.queue_has_work()
&& !self.has_active_jobs()
&& self.workers_idle()
&& self.last_id.load() > 0
}
pub fn clear(&self) {
let default = Self::default();
self.update(&default);
}
}