Skip to main content

SimpleQueue

Struct SimpleQueue 

Source
pub struct SimpleQueue { /* private fields */ }
Expand description

Queue engine struct Use [SimpleQueue::new(pool: PgPool)] to create an instance.

Queue is configurable by builder-style methods.

Following parameters are configuratble:

  • heartbeat_interval - how often running job will be touched (thus indicating the job is actually running) when job stops being updated it’s recognized as stalled and is requeued by a reaper worker
  • empty_poll_sleep - how long to sleep between between polling when queue is empty - should be low for high throughput queues, high for low throughput queues
  • max_reprocess_count - how many times a job will be reprocessed before being discarded - poison job discovery (count increases only on job reschedule and stalled job recovery)
  • janitor_interval - how often the janitor task will run (i.e. archive completed jobs, move failed jobs to dead queue)
  • hold_queue_semaphore - how long to hold a queue semaphore before releasing it (when queue is congested releasing immediately might result in polling repicking same jobs over and over, starving other queues)

Implementations§

Source§

impl SimpleQueue

Source

pub fn new(pool: PgPool) -> Self

Creates new Queue with default configuration:

  • global semaphore with 500 permits (global limit)
  • queue semaphore (per queue limit) with 100 permits
  • no custrom semaphore strategy
  • heartbeat interval of 5 seconds
  • linear backoff strategy with 5s retry interval
  • pool sleep when table is empty of 1 second
  • max reprocess count (poison/loop detection) of 100
Source

pub async fn new_from_url(url: &str) -> Result<Self, Error>

Create a new SimpleQueue with a PostgreSQL pool created from a URL. See SimpleQueue::new for default configuration details.

Source

pub fn with_global_semaphore(self, permits: usize) -> Self

Set the number of global semaphore permits

Default value: 500

Source

pub fn with_queue_strategy( self, queue: String, strategy: impl JobStrategy + 'static, ) -> Self

Set the queue waiting strategy for a specific queue

Default value if not set is default_queue_strategy parameter.

Source

pub fn with_queue_semaphore(self, queue: String, permits: usize) -> Self

Set the number of semaphore permits for a specific queue

Default value if not set is in queue_sem_count parameter set by with_queue_default_semaphore_size.

Source

pub fn with_queue_default_semaphore_size(self, permits: usize) -> Self

Set the default number of permits for queues

Default value: 100

Source

pub fn with_heartbeat_interval(self, interval: Duration) -> Self

Set the heartbeat interval

Default values: 5 seconds

Source

pub fn with_default_backoff_strategy(self, strategy: BackoffStrategy) -> Self

Set the default backoff strategy.

See also BackoffStrategy.

Default strategy: BackoffStrategy::Linear with delay of 5 seconds.

Source

pub fn with_default_queue_strategy(self, strategy: Arc<dyn JobStrategy>) -> Self

Set the default queue strategy

Default: sync::InstantStrategy

Source

pub fn with_queue_backoff_strategy( self, queue: String, strategy: BackoffStrategy, ) -> Self

Set the backoff strategy for a specific queue

See also BackoffStrategy.

Default is taken from default_backoff_strategy.

Source

pub fn with_empty_poll_sleep(self, duration: Duration) -> Self

Set the duration to sleep between polls when no jobs are found

This is used only when no jobs are found in the queue. Should be set to low value for critical and high throughput queues.

Default: 1 second.

Source

pub fn with_max_reprocess_count(self, count: usize) -> Self

Set the maximum number of times a job can be reprocessed E.g. rescheduled without attempt consumption or recovered from stalled state (default: 100)

Reprocess count is to prevent infinite reschedules (not consuming attempt) and stall job recovery.

Default: 100.

Source

pub fn with_janitor_interval(self, duration: Duration) -> Self

Set the interval between janitor runs.

Janitor moves completed and failed jobs to the archive and dlq tables

Default: 1 minute.

Source

pub fn with_hold_queue_semaphore(self, duration: Duration) -> Self

Set the duration of how logn to hold the queue semaphore acquisition.

When queue semaphore is held, spawned tokio job will wait that much time before giving up and releasing the semaphore. This is a hold-up mechanism so that poller don’t re-pick too quickly preventing other queues from being processed.

Default: 500 milliseconds.

Source§

impl SimpleQueue

Source

pub fn register_handler(&self, handler: impl Handler + 'static) -> &Self

Register a job handler for a specific queue.

Source§

impl SimpleQueue

Source

pub async fn insert_job(&self, job: Job) -> Result<Option<Uuid>, BoxDynError>

Insert a job into the queue. If a job with the same unique key already exists, and it is still pending or running, the insert will be a no-op.

Source

pub async fn insert_jobs( &self, jobs: Vec<Job>, ) -> Result<Vec<Uuid>, BoxDynError>

Source

pub async fn cancel_job_by_unique_key( &self, unique_key: &str, ) -> Result<(), BoxDynError>

Source

pub async fn cancel_all_jobs_by_fingerprint( &self, fingerprint: &str, ) -> Result<(), BoxDynError>

Source§

impl SimpleQueue

Source

pub async fn run( self: Arc<Self>, start_permit: Option<OwnedSemaphorePermit>, ) -> Result<(), BoxDynError>

Poll the queue for the next job to run (in loop). If no jobs are found, sleep for empty_poll_sleep before retrying.

Passed start_permit is used to guarantee that worker started. Mostly helpful with tests, as checks might fail if queue was still warming up.

Source§

impl SimpleQueue

Source

pub async fn insert_job_and_wait( &self, job: Job, ) -> Result<Option<(Receiver<()>, Uuid)>, BoxDynError>

Available on crate feature wait-for-job only.

Insert a single job into the queue and return a receiver that will be signaled when the job completes. If the insert was a no-op (e.g., due to a duplicate unique key), Ok(None) is returned.

Note: The channel returns on the first processing attempt, regardless of whether the job succeeds or fails.

Source

pub async fn insert_jobs_and_wait( &self, jobs: Vec<Job>, ) -> Result<Vec<(Receiver<()>, Uuid)>, BoxDynError>

Available on crate feature wait-for-job only.

Insert multiple jobs into the queue and return a receiver for each job that will be signaled when the corresponding job completes.

Note: Each channel returns on the first processing attempt, regardless of whether the job succeeds or fails.

Source§

impl SimpleQueue

Source

pub async fn reaper(&self) -> Reaper

Return a reaper::Reaper instance that fixes stale jobs

Useful when you want to control reaper::Reaper thread yourself.

Source

pub async fn janitor(&self) -> Janitor

Available on crate feature janitor only.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more