pub struct SimpleQueue { /* private fields */ }Expand description
Queue engine struct
Use [SimpleQueue::new(pool: PgPool)] to create an instance.
Queue is configurable by builder-style methods.
Following parameters are configuratble:
heartbeat_interval- how often running job will be touched (thus indicating the job is actually running) when job stops being updated it’s recognized as stalled and is requeued by a reaper workerempty_poll_sleep- how long to sleep between between polling when queue is empty - should be low for high throughput queues, high for low throughput queuesmax_reprocess_count- how many times a job will be reprocessed before being discarded - poison job discovery (count increases only on job reschedule and stalled job recovery)janitor_interval- how often the janitor task will run (i.e. archive completed jobs, move failed jobs to dead queue)hold_queue_semaphore- how long to hold a queue semaphore before releasing it (when queue is congested releasing immediately might result in polling repicking same jobs over and over, starving other queues)
Implementations§
Source§impl SimpleQueue
impl SimpleQueue
Sourcepub fn new(pool: PgPool) -> Self
pub fn new(pool: PgPool) -> Self
Creates new Queue with default configuration:
- global semaphore with 500 permits (global limit)
- queue semaphore (per queue limit) with 100 permits
- no custrom semaphore strategy
- heartbeat interval of 5 seconds
- linear backoff strategy with 5s retry interval
- pool sleep when table is empty of 1 second
- max reprocess count (poison/loop detection) of 100
Sourcepub async fn new_from_url(url: &str) -> Result<Self, Error>
pub async fn new_from_url(url: &str) -> Result<Self, Error>
Create a new SimpleQueue with a PostgreSQL pool created from a URL.
See SimpleQueue::new for default configuration details.
Sourcepub fn with_global_semaphore(self, permits: usize) -> Self
pub fn with_global_semaphore(self, permits: usize) -> Self
Set the number of global semaphore permits
Default value: 500
Sourcepub fn with_queue_strategy(
self,
queue: String,
strategy: impl JobStrategy + 'static,
) -> Self
pub fn with_queue_strategy( self, queue: String, strategy: impl JobStrategy + 'static, ) -> Self
Set the queue waiting strategy for a specific queue
Default value if not set is default_queue_strategy parameter.
Sourcepub fn with_queue_semaphore(self, queue: String, permits: usize) -> Self
pub fn with_queue_semaphore(self, queue: String, permits: usize) -> Self
Set the number of semaphore permits for a specific queue
Default value if not set is in queue_sem_count parameter set by with_queue_default_semaphore_size.
Sourcepub fn with_queue_default_semaphore_size(self, permits: usize) -> Self
pub fn with_queue_default_semaphore_size(self, permits: usize) -> Self
Set the default number of permits for queues
Default value: 100
Sourcepub fn with_heartbeat_interval(self, interval: Duration) -> Self
pub fn with_heartbeat_interval(self, interval: Duration) -> Self
Set the heartbeat interval
Default values: 5 seconds
Sourcepub fn with_default_backoff_strategy(self, strategy: BackoffStrategy) -> Self
pub fn with_default_backoff_strategy(self, strategy: BackoffStrategy) -> Self
Set the default backoff strategy.
See also BackoffStrategy.
Default strategy: BackoffStrategy::Linear with delay of 5 seconds.
Sourcepub fn with_default_queue_strategy(self, strategy: Arc<dyn JobStrategy>) -> Self
pub fn with_default_queue_strategy(self, strategy: Arc<dyn JobStrategy>) -> Self
Set the default queue strategy
Default: sync::InstantStrategy
Sourcepub fn with_queue_backoff_strategy(
self,
queue: String,
strategy: BackoffStrategy,
) -> Self
pub fn with_queue_backoff_strategy( self, queue: String, strategy: BackoffStrategy, ) -> Self
Set the backoff strategy for a specific queue
See also BackoffStrategy.
Default is taken from default_backoff_strategy.
Sourcepub fn with_empty_poll_sleep(self, duration: Duration) -> Self
pub fn with_empty_poll_sleep(self, duration: Duration) -> Self
Set the duration to sleep between polls when no jobs are found
This is used only when no jobs are found in the queue. Should be set to low value for critical and high throughput queues.
Default: 1 second.
Sourcepub fn with_max_reprocess_count(self, count: usize) -> Self
pub fn with_max_reprocess_count(self, count: usize) -> Self
Set the maximum number of times a job can be reprocessed E.g. rescheduled without attempt consumption or recovered from stalled state (default: 100)
Reprocess count is to prevent infinite reschedules (not consuming attempt) and stall job recovery.
Default: 100.
Sourcepub fn with_janitor_interval(self, duration: Duration) -> Self
pub fn with_janitor_interval(self, duration: Duration) -> Self
Set the interval between janitor runs.
Janitor moves completed and failed jobs to the archive and dlq tables
Default: 1 minute.
Sourcepub fn with_hold_queue_semaphore(self, duration: Duration) -> Self
pub fn with_hold_queue_semaphore(self, duration: Duration) -> Self
Set the duration of how logn to hold the queue semaphore acquisition.
When queue semaphore is held, spawned tokio job will wait that much time before giving up and releasing the semaphore. This is a hold-up mechanism so that poller don’t re-pick too quickly preventing other queues from being processed.
Default: 500 milliseconds.
Source§impl SimpleQueue
impl SimpleQueue
Sourcepub fn register_handler(&self, handler: impl Handler + 'static) -> &Self
pub fn register_handler(&self, handler: impl Handler + 'static) -> &Self
Register a job handler for a specific queue.
Source§impl SimpleQueue
impl SimpleQueue
Sourcepub async fn insert_job(&self, job: Job) -> Result<Option<Uuid>, BoxDynError>
pub async fn insert_job(&self, job: Job) -> Result<Option<Uuid>, BoxDynError>
Insert a job into the queue. If a job with the same unique key already exists, and it is still pending or running, the insert will be a no-op.
pub async fn insert_jobs( &self, jobs: Vec<Job>, ) -> Result<Vec<Uuid>, BoxDynError>
pub async fn cancel_job_by_unique_key( &self, unique_key: &str, ) -> Result<(), BoxDynError>
pub async fn cancel_all_jobs_by_fingerprint( &self, fingerprint: &str, ) -> Result<(), BoxDynError>
Source§impl SimpleQueue
impl SimpleQueue
Sourcepub async fn run(
self: Arc<Self>,
start_permit: Option<OwnedSemaphorePermit>,
) -> Result<(), BoxDynError>
pub async fn run( self: Arc<Self>, start_permit: Option<OwnedSemaphorePermit>, ) -> Result<(), BoxDynError>
Poll the queue for the next job to run (in loop). If no jobs are found, sleep for
empty_poll_sleep before retrying.
Passed start_permit is used to guarantee that worker started.
Mostly helpful with tests, as checks might fail if queue was still warming up.
Source§impl SimpleQueue
impl SimpleQueue
Sourcepub async fn insert_job_and_wait(
&self,
job: Job,
) -> Result<Option<(Receiver<()>, Uuid)>, BoxDynError>
Available on crate feature wait-for-job only.
pub async fn insert_job_and_wait( &self, job: Job, ) -> Result<Option<(Receiver<()>, Uuid)>, BoxDynError>
wait-for-job only.Insert a single job into the queue and return a receiver that will be
signaled when the job completes. If the insert was a no-op (e.g., due to
a duplicate unique key), Ok(None) is returned.
Note: The channel returns on the first processing attempt, regardless of whether the job succeeds or fails.
Sourcepub async fn insert_jobs_and_wait(
&self,
jobs: Vec<Job>,
) -> Result<Vec<(Receiver<()>, Uuid)>, BoxDynError>
Available on crate feature wait-for-job only.
pub async fn insert_jobs_and_wait( &self, jobs: Vec<Job>, ) -> Result<Vec<(Receiver<()>, Uuid)>, BoxDynError>
wait-for-job only.Insert multiple jobs into the queue and return a receiver for each job that will be signaled when the corresponding job completes.
Note: Each channel returns on the first processing attempt, regardless of whether the job succeeds or fails.
Source§impl SimpleQueue
impl SimpleQueue
Sourcepub async fn reaper(&self) -> Reaper
pub async fn reaper(&self) -> Reaper
Return a reaper::Reaper instance that fixes stale jobs
Useful when you want to control reaper::Reaper thread yourself.
pub async fn janitor(&self) -> Janitor
janitor only.Auto Trait Implementations§
impl Freeze for SimpleQueue
impl !RefUnwindSafe for SimpleQueue
impl Send for SimpleQueue
impl Sync for SimpleQueue
impl Unpin for SimpleQueue
impl UnsafeUnpin for SimpleQueue
impl !UnwindSafe for SimpleQueue
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more