Struct Queue

Source
pub struct Queue<T: Task> { /* private fields */ }
Expand description

Task queue.

Queues are responsible for managing task lifecycle.

Implementations§

Source§

impl<T: Task> Queue<T>

Source

pub fn builder() -> Builder<T, Initial>

Creates a builder for a new queue.

§Example
use underway::Queue;

let pool = { /* A `PgPool`. */ };
let queue: Queue<ExampleTask> = Queue::builder()
    .name("example")
    .dead_letter_queue("example_dlq")
    .pool(pool)
    .build()
    .await?;
Source

pub async fn enqueue<'a, E>( &self, executor: E, task: &T, input: &T::Input, ) -> Result<TaskId, Error>
where E: PgExecutor<'a>,

Enqueues a new task into the task queue, returning the task’s unique ID.

This function inserts a new task into the database using the provided executor. By using a transaction as the executor, you can ensure that the task is only enqueued if the transaction successfully commits.

The enqueued task will have its retry policy, timeout, time-to-live, delay, concurrency key, and priority configured as specified by the task type.

An ID, which is a ULID converted to UUIDv4, is also assigned to the task and returned upon successful enqueue.

Note: If you pass a transactional executor and the transaction is rolled back, the returned task ID will not correspond to any persisted task.

§Errors

This function will return an error if:

  • The input cannot be serialized to JSON.
  • The database operation fails during the insertion.
  • Any of timeout, delay, or ttl cannot be converted to std::time::Duration.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };

// Enqueue a new task with input.
let task_id = queue.enqueue(&pool, &task, &()).await?;
Source

pub async fn enqueue_after<'a, E>( &self, executor: E, task: &T, input: &T::Input, delay: Span, ) -> Result<TaskId, Error>
where E: PgExecutor<'a>,

Same as enqueue, but the task doesn’t become available until after the specified delay.

Note: The provided delay is added to the task’s configured delay. This means that if you provide a five-minute delay and the task is already configured with a thirty-second delay the task will not be dequeued for at least five and half minutes.

§Example
use jiff::ToSpan;

let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };

// Enqueue a new task with input after five minutes.
let task_id = queue.enqueue_after(&pool, &task, &(), 5.minutes()).await?;
Source

pub async fn enqueue_multi<'a, E>( &self, executor: E, task: &T, inputs: &[T::Input], ) -> Result<usize, Error>
where E: PgExecutor<'a> + Acquire<'a, Database = Postgres>,

Enqueues tasks in chunks (max 5000 per batch) within a single transaction.

§Example

let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };

// Enqueue a new task with input after five minutes.
let _ = queue.enqueue_multi(&pool, &task, &[(), ()]).await?;
Source

pub async fn enqueue_multi_wth_chunk_size<'a, E>( &self, executor: E, task: &T, inputs: &[T::Input], chunk_size: usize, ) -> Result<usize, Error>
where E: PgExecutor<'a> + Acquire<'a, Database = Postgres>,

Same as enqueue_multi, but allows you to specify chunk_size

Source

pub async fn dequeue(&self) -> Result<Option<InProgressTask>, Error>

Dequeues the next available task.

This method uses the FOR UPDATE SKIP LOCKED clause to ensure efficient retrieval of pending task rows.

If an available task is found, it’s marked as “in progress”.

§Transactions, locks, and timeouts

At the beginning of this operation, a new transaction is started (otherwise if already in a transaction a savepoint is created). Before returning, this transaction is committed. It’s important to point out that the row locked by this transaction will be locked until the transaction is committed, rolled back, or is otherwise reset, e.g. via a timeout.

More specifically the implication is that transactions should set a reasonable timeout to ensure tasks are returned to the queue on a reasonable time horizon. Note that Underway does not currently set any database-level timeouts.

§Errors

This function will return an error if:

  • The database operation fails during select.
  • The database operation fails during update.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };

// Enqueue a new task.
queue.enqueue(&pool, &task, &()).await?;

// Dequeue the enqueued task.
let mut tx = pool.begin().await?;
let pending_task = queue
    .dequeue()
    .await?
    .expect("There should be a pending task.");
Source

pub async fn schedule<'a, E>( &self, executor: E, zoned_schedule: &ZonedSchedule, input: &T::Input, ) -> Result<(), Error>
where E: PgExecutor<'a>,

Creates a schedule for the queue.

Schedules are useful when a task should be run periodically, according to a crontab definition.

Note: After a schedule has been set, a scheduler instance must be run in order for schedules to fire.

§Errors

This function will return an error if:

  • The input value cannot be serialized.
  • The database operation fails during insert.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };

// Set a schedule on the queue with the given input.
let daily = "@daily[America/Los_Angeles]".parse()?;
queue.schedule(&pool, &daily, &()).await?;
Source

pub async fn unschedule<'a, E>(&self, executor: E) -> Result<(), Error>
where E: PgExecutor<'a>,

Removes the configured schedule for the queue, if one exsists..

§Errors

This function will return an error if:

  • The database operation fails during insert.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };

// Unset the schedule if one was set.
queue.unschedule(&pool).await?;

Trait Implementations§

Source§

impl<T: Task> Clone for Queue<T>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Debug + Task> Debug for Queue<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Queue<T>

§

impl<T> !RefUnwindSafe for Queue<T>

§

impl<T> Send for Queue<T>

§

impl<T> Sync for Queue<T>
where T: Sync,

§

impl<T> Unpin for Queue<T>
where T: Unpin,

§

impl<T> !UnwindSafe for Queue<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,