Module queue

Module queue 

Source
Expand description

Queues provide an interface for managing task lifecycle.

Tasks are enqueued onto the queue, using the Queue::enqueue method, and later dequeued, using the Queue::dequeue method, when they’re executed.

Each queue is identified by a unique name and manages a specific implementation of Task. In other words, a given queue always manages exactly one kind of task and only that kind of task.

The semantics for retrieving a task from the queue are defined by the order of insertion, first-in, first-out (FIFO), or the priority the task defines. If a priority is defined, then priority is considered before the order the task was inserted.

§Dead-letter queues

When a dead-letter queue name is provided, a secondary queue is created with this name. This is a queue of “dead letters”. In other words, it’s a queue of tasks that have failed and can’t be retried.

Dead-letter queues can be useful for identifying patterns of failures or reprocessing failed tasks at later date.

To enable dead-letter queues, simply provide its name:

use underway::Queue;

let pool = { /* A `PgPool`. */ };
let queue = Queue::builder()
    .name("example_queue")
    // Enable the dead-letter queue.
    .dead_letter_queue("example_dlq")
    .pool(pool.clone())
    .build()
    .await?;

§Processing tasks

Note that while queues provide methods for enqueuing and dequeuing tasks, you will generaly not use these methods directly and instead use jobs and their workers, respectively.

For example, Job provides an enqueue method, which wraps its queue’s enqueue method. Likewise, when a job spins up a worker via its run method, that worker uses its queue’s dequeue method and in both cases there’s no need to use the queue methods directly.

With that said, a queue may be interfaced with directly and operated manually if desired:

use underway::Queue;

let pool = { /* A `PgPool`. */ };
let queue = Queue::builder()
    .name("example_queue")
    .pool(pool.clone())
    .build()
    .await?;

let task = { /* A type that implements `Task`. */ };

// Enqueue the task.
queue.enqueue(&pool, &task, &()).await?;

// Retrieve an available task for processing.
if let Some(in_progress_task) = queue.dequeue().await? {
    // Process the task here
}

Note that dequeuing a task will set its state to “in-progress” and create an associated attempt row. Once dequeued in this manner, the state of the in-progress task must be managed through the InProgressTask interface. In particular, the task should eventually reach a terminal state.

Only tasks which are in the “pending” state or are otherwise considered stale (i.e. they have not updated their heartbeat within the expected time frame) and with remaining retries can be dequeued.

§Scheduling tasks

It’s important to note that a schedule must be set on the queue before the scheduler can be run.

As with task processing, jobs provide an interface for scheduling. For example, the schedule method schedules the job. Once scheduled, a scheduler must be run to ensure exeuction.

Of course it’s also possible to interface directly with the queue to achieve the same if desired. Schedules can be set with the Queue::schedule method. Once set, a scheduler can be used to run the schedule via the Scheduler::run method:

use underway::{Queue, Scheduler};

let pool = { /* A `PgPool`. */ };
let queue = Queue::builder()
    .name("example_queue")
    .pool(pool.clone())
    .build()
    .await?;

// Set a quarter-hour schedule; IANA timezones are mandatory.
let quarter_hour = "0 */15 * * * *[America/Los_Angeles]".parse()?;
queue.schedule(&pool, &quarter_hour, &()).await?;

let task = { /* A type that implements `Task`. */ };
let scheduler = Scheduler::new(queue.into(), task);

// Run a scheduler based on our configured schedule.
scheduler.run().await?;

// Don't forget that there's no workers running, so even if we schedule work, nothing will
// happen!

§Deleting expired tasks

Tasks configure a time-to-live (TTL) which specifies how long they’ll remain in the queue. However, tasks are only deleted from the queue if the deletion routine is explicitly invoked. Either run_deletion or run_deletion_every should be called to start the deletion routine.

Note: Tasks will not be deleted from the queue if this routine is not running!

use underway::queue;

let pool = { /* A `PgPool`. */ };

// Ensure we remove tasks that have an expired TTL.
queue::run_deletion(&pool).await?;

Structs§

Builder
Builds a Queue.
InProgressTask
Represents an in-progress task that’s been dequeued.
Queue
Task queue.

Enums§

Error
Queue errors.

Functions§

graceful_shutdown
Initiates a graceful shutdown by sending a NOTIFY to the underway_shutdown channel via the pg_notify function.
run_deletion
Runs deletion clean up of expired tasks every hour.
run_deletion_every
Runs deletion clean up of expired tasks in a loop, sleeping between deletions for the specified period.