Expand description
Queues provide an interface for managing task lifecycle.
Tasks are enqueued onto the queue, using the Queue::enqueue method, and
later dequeued, using the Queue::dequeue method, when they’re executed.
Each queue is identified by a unique name and manages a specific
implementation of Task. In other words, a given queue always manages
exactly one kind of task and only that kind of task.
The semantics for retrieving a task from the queue are defined by the order of insertion, first-in, first-out (FIFO), or the priority the task defines. If a priority is defined, then priority is considered before the order the task was inserted.
§Dead-letter queues
When a dead-letter queue name is provided, a secondary queue is created with this name. This is a queue of “dead letters”. In other words, it’s a queue of tasks that have failed and can’t be retried.
Dead-letter queues can be useful for identifying patterns of failures or reprocessing failed tasks at later date.
To enable dead-letter queues, simply provide its name:
use underway::Queue;
let pool = { /* A `PgPool`. */ };
let queue = Queue::builder()
.name("example_queue")
// Enable the dead-letter queue.
.dead_letter_queue("example_dlq")
.pool(pool.clone())
.build()
.await?;§Processing tasks
Note that while queues provide methods for enqueuing and dequeuing tasks, you will generaly not use these methods directly and instead use jobs and their workers, respectively.
For example, Job provides an enqueue method,
which wraps its queue’s enqueue method. Likewise, when a job spins up a
worker via its run method, that worker uses its queue’s
dequeue method and in both cases there’s no need to use the queue methods
directly.
With that said, a queue may be interfaced with directly and operated manually if desired:
use underway::Queue;
let pool = { /* A `PgPool`. */ };
let queue = Queue::builder()
.name("example_queue")
.pool(pool.clone())
.build()
.await?;
let task = { /* A type that implements `Task`. */ };
// Enqueue the task.
queue.enqueue(&pool, &task, &()).await?;
// Retrieve an available task for processing.
if let Some(in_progress_task) = queue.dequeue().await? {
// Process the task here
}Note that dequeuing a task will set its state to “in-progress” and create an
associated attempt row. Once dequeued in this manner, the state of the
in-progress task must be managed through the InProgressTask interface.
In particular, the task should eventually reach a terminal state.
Only tasks which are in the “pending” state or are otherwise considered stale (i.e. they have not updated their heartbeat within the expected time frame) and with remaining retries can be dequeued.
§Scheduling tasks
It’s important to note that a schedule must be set on the queue before the scheduler can be run.
As with task processing, jobs provide an interface for scheduling. For
example, the schedule method schedules the job.
Once scheduled, a scheduler must be run to ensure exeuction.
Of course it’s also possible to interface directly with the queue to achieve
the same if desired. Schedules can be set with the
Queue::schedule method. Once set, a scheduler can be
used to run the schedule via the Scheduler::run
method:
use underway::{Queue, Scheduler};
let pool = { /* A `PgPool`. */ };
let queue = Queue::builder()
.name("example_queue")
.pool(pool.clone())
.build()
.await?;
// Set a quarter-hour schedule; IANA timezones are mandatory.
let quarter_hour = "0 */15 * * * *[America/Los_Angeles]".parse()?;
queue.schedule(&pool, &quarter_hour, &()).await?;
let task = { /* A type that implements `Task`. */ };
let scheduler = Scheduler::new(queue.into(), task);
// Run a scheduler based on our configured schedule.
scheduler.run().await?;
// Don't forget that there's no workers running, so even if we schedule work, nothing will
// happen!§Deleting expired tasks
Tasks configure a time-to-live (TTL) which specifies how long they’ll remain
in the queue. However, tasks are only deleted from the queue if the deletion
routine is explicitly invoked. Either run_deletion or
run_deletion_every should be called to start the deletion routine.
Note: Tasks will not be deleted from the queue if this routine is not running!
use underway::queue;
let pool = { /* A `PgPool`. */ };
// Ensure we remove tasks that have an expired TTL.
queue::run_deletion(&pool).await?;
Structs§
- Builder
- Builds a
Queue. - InProgress
Task - Represents an in-progress task that’s been dequeued.
- Queue
- Task queue.
Enums§
- Error
- Queue errors.
Functions§
- graceful_
shutdown - Initiates a graceful shutdown by sending a
NOTIFYto theunderway_shutdownchannel via thepg_notifyfunction. - run_
deletion - Runs deletion clean up of expired tasks every hour.
- run_
deletion_ every - Runs deletion clean up of expired tasks in a loop, sleeping between deletions for the specified period.