use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use parking_lot::Mutex;
use scc::HashMap as SccHashMap;
use tokio::sync::Notify;
use super::DeadJob;
use super::Job;
use super::QueueBuilder;
use super::QueueError;
use super::RetryPolicy;
#[cfg(feature = "signals")]
use super::signal_ids;
use super::worker::worker_loop;
#[cfg(feature = "signals")]
use crate::signals::Signal;
#[cfg(feature = "signals")]
use crate::signals::SignalArbiter;
pub(crate) struct PendingJob {
pub(crate) id: u64,
pub(crate) name: String,
pub(crate) payload: Vec<u8>,
pub(crate) attempt: u32,
pub(crate) run_after: Option<Instant>,
pub(crate) dedup_key: Option<String>,
}
pub(crate) type BoxHandler =
Arc<dyn Fn(Job) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send>> + Send + Sync>;
pub(crate) struct QueueInner {
pub(crate) pending: Mutex<VecDeque<PendingJob>>,
pub(crate) handlers: SccHashMap<String, BoxHandler>,
pub(crate) dead_letters: Mutex<Vec<Arc<DeadJob>>>,
pub(crate) notify: Notify,
pub(crate) next_id: AtomicU64,
pub(crate) num_workers: usize,
pub(crate) retry_policy: RetryPolicy,
pub(crate) shutdown: AtomicBool,
pub(crate) inflight: AtomicU64,
pub(crate) drain_notify: Notify,
}
#[derive(Clone)]
pub struct Queue {
pub(crate) inner: Arc<QueueInner>,
}
impl Queue {
pub fn new() -> Self {
Self::builder().build()
}
pub fn builder() -> QueueBuilder {
QueueBuilder {
workers: 4,
retry: RetryPolicy::default(),
}
}
pub fn register<F, Fut>(&self, name: impl Into<String>, handler: F)
where
F: Fn(Job) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), QueueError>> + Send + 'static,
{
let name = name.into();
let handler: BoxHandler = Arc::new(move |job| Box::pin(handler(job)));
let _ = self.inner.handlers.insert_sync(name, handler);
}
pub async fn push(
&self,
name: impl Into<String>,
payload: &(impl serde::Serialize + ?Sized),
) -> Result<u64, QueueError> {
self.push_inner(name.into(), payload, None)
}
pub async fn push_delayed(
&self,
name: impl Into<String>,
payload: &(impl serde::Serialize + ?Sized),
delay: Duration,
) -> Result<u64, QueueError> {
self.push_inner(name.into(), payload, Some(Instant::now() + delay))
}
pub async fn push_dedup(
&self,
name: impl Into<String>,
payload: &(impl serde::Serialize + ?Sized),
dedup_key: impl Into<String>,
) -> Result<u64, QueueError> {
if self.inner.shutdown.load(Ordering::SeqCst) {
return Err(QueueError::Shutdown);
}
let key = dedup_key.into();
let name = name.into();
let bytes =
serde_json::to_vec(payload).map_err(|e| QueueError::SerializeError(e.to_string()))?;
let id = {
let mut pending = self.inner.pending.lock();
if self.inner.shutdown.load(Ordering::SeqCst) {
return Err(QueueError::Shutdown);
}
for j in pending.iter() {
if j.dedup_key.as_deref() == Some(key.as_str()) {
return Ok(j.id);
}
}
let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
pending.push_back(PendingJob {
id,
name,
payload: bytes,
attempt: 0,
run_after: None,
dedup_key: Some(key),
});
id
};
self.inner.notify.notify_one();
Ok(id)
}
fn push_inner(
&self,
name: String,
payload: &(impl serde::Serialize + ?Sized),
run_after: Option<Instant>,
) -> Result<u64, QueueError> {
if self.inner.shutdown.load(Ordering::SeqCst) {
return Err(QueueError::Shutdown);
}
let bytes =
serde_json::to_vec(payload).map_err(|e| QueueError::SerializeError(e.to_string()))?;
let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
#[cfg(feature = "signals")]
let job_name = name.clone();
{
let mut pending = self.inner.pending.lock();
if self.inner.shutdown.load(Ordering::SeqCst) {
return Err(QueueError::Shutdown);
}
pending.push_back(PendingJob {
id,
name,
payload: bytes,
attempt: 0,
run_after,
dedup_key: None,
});
}
self.inner.notify.notify_one();
#[cfg(feature = "signals")]
{
let arbiter = SignalArbiter::emit_app(
Signal::with_capacity(signal_ids::QUEUE_JOB_QUEUED, 2)
.meta("name", job_name)
.meta("id", id.to_string()),
);
#[cfg(not(feature = "compio"))]
{
tokio::spawn(arbiter);
}
#[cfg(feature = "compio")]
{
compio::runtime::spawn(arbiter).detach();
}
}
Ok(id)
}
#[cfg(not(feature = "compio"))]
pub fn start(&self) {
for _ in 0..self.inner.num_workers {
let inner = self.inner.clone();
tokio::spawn(async move { worker_loop(inner).await });
}
tracing::debug!("Queue started with {} workers", self.inner.num_workers);
}
#[cfg(feature = "compio")]
pub fn start(&self) {
for _ in 0..self.inner.num_workers {
let inner = self.inner.clone();
compio::runtime::spawn(async move { worker_loop(inner).await }).detach();
}
tracing::debug!("Queue started with {} workers", self.inner.num_workers);
}
pub async fn shutdown(&self, timeout: Duration) {
{
let _guard = self.inner.pending.lock();
self.inner.shutdown.store(true, Ordering::SeqCst);
}
self.inner.notify.notify_waiters();
if self.inner.inflight.load(Ordering::SeqCst) > 0 {
#[cfg(not(feature = "compio"))]
{
let _ = tokio::time::timeout(timeout, self.inner.drain_notify.notified()).await;
}
#[cfg(feature = "compio")]
{
let drain = std::pin::pin!(self.inner.drain_notify.notified());
let sleep = std::pin::pin!(compio::time::sleep(timeout));
let _ = futures_util::future::select(drain, sleep).await;
}
}
tracing::debug!("Queue shut down");
}
pub fn dead_letters(&self) -> Vec<DeadJob> {
self
.inner
.dead_letters
.lock()
.iter()
.map(|j| (**j).clone())
.collect()
}
pub fn dead_letters_arc(&self) -> Vec<Arc<DeadJob>> {
self.inner.dead_letters.lock().clone()
}
pub fn dead_letter_count(&self) -> usize {
self.inner.dead_letters.lock().len()
}
pub fn clear_dead_letters(&self) {
self.inner.dead_letters.lock().clear();
}
pub fn pending_count(&self) -> usize {
self.inner.pending.lock().len()
}
pub fn inflight_count(&self) -> u64 {
self.inner.inflight.load(Ordering::SeqCst)
}
}
impl Default for Queue {
fn default() -> Self {
Self::new()
}
}