pub struct Queue<T: Task> { /* private fields */ }
Expand description
Task queue.
Queues are responsible for managing task lifecycle.
Implementations§
Source§impl<T: Task> Queue<T>
impl<T: Task> Queue<T>
Sourcepub fn builder() -> Builder<T, Initial>
pub fn builder() -> Builder<T, Initial>
Creates a builder for a new queue.
§Example
use underway::Queue;
let pool = { /* A `PgPool`. */ };
let queue: Queue<ExampleTask> = Queue::builder()
.name("example")
.dead_letter_queue("example_dlq")
.pool(pool)
.build()
.await?;
Sourcepub async fn enqueue<'a, E>(
&self,
executor: E,
task: &T,
input: &T::Input,
) -> Result<TaskId, Error>where
E: PgExecutor<'a>,
pub async fn enqueue<'a, E>(
&self,
executor: E,
task: &T,
input: &T::Input,
) -> Result<TaskId, Error>where
E: PgExecutor<'a>,
Enqueues a new task into the task queue, returning the task’s unique ID.
This function inserts a new task into the database using the provided executor. By using a transaction as the executor, you can ensure that the task is only enqueued if the transaction successfully commits.
The enqueued task will have its retry policy, timeout, time-to-live, delay, concurrency key, and priority configured as specified by the task type.
An ID, which is a ULID
converted to UUIDv4
, is also assigned
to the task and returned upon successful enqueue.
Note: If you pass a transactional executor and the transaction is rolled back, the returned task ID will not correspond to any persisted task.
§Errors
This function will return an error if:
- The
input
cannot be serialized to JSON. - The database operation fails during the insertion.
- Any of
timeout
,delay
, orttl
cannot be converted tostd::time::Duration
.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };
// Enqueue a new task with input.
let task_id = queue.enqueue(&pool, &task, &()).await?;
Sourcepub async fn enqueue_after<'a, E>(
&self,
executor: E,
task: &T,
input: &T::Input,
delay: Span,
) -> Result<TaskId, Error>where
E: PgExecutor<'a>,
pub async fn enqueue_after<'a, E>(
&self,
executor: E,
task: &T,
input: &T::Input,
delay: Span,
) -> Result<TaskId, Error>where
E: PgExecutor<'a>,
Same as enqueue
, but the task doesn’t become
available until after the specified delay.
Note: The provided delay is added to the task’s configured delay. This means that if you provide a five-minute delay and the task is already configured with a thirty-second delay the task will not be dequeued for at least five and half minutes.
§Example
use jiff::ToSpan;
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };
// Enqueue a new task with input after five minutes.
let task_id = queue.enqueue_after(&pool, &task, &(), 5.minutes()).await?;
Sourcepub async fn enqueue_multi<'a, E>(
&self,
executor: E,
task: &T,
inputs: &[T::Input],
) -> Result<usize, Error>
pub async fn enqueue_multi<'a, E>( &self, executor: E, task: &T, inputs: &[T::Input], ) -> Result<usize, Error>
Enqueues tasks in chunks (max 5000 per batch) within a single transaction.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };
// Enqueue a new task with input after five minutes.
let _ = queue.enqueue_multi(&pool, &task, &[(), ()]).await?;
Sourcepub async fn enqueue_multi_wth_chunk_size<'a, E>(
&self,
executor: E,
task: &T,
inputs: &[T::Input],
chunk_size: usize,
) -> Result<usize, Error>
pub async fn enqueue_multi_wth_chunk_size<'a, E>( &self, executor: E, task: &T, inputs: &[T::Input], chunk_size: usize, ) -> Result<usize, Error>
Same as enqueue_multi
, but allows you to specify chunk_size
Sourcepub async fn dequeue(&self) -> Result<Option<InProgressTask>, Error>
pub async fn dequeue(&self) -> Result<Option<InProgressTask>, Error>
Dequeues the next available task.
This method uses the FOR UPDATE SKIP LOCKED
clause to ensure efficient
retrieval of pending task rows.
If an available task is found, it’s marked as “in progress”.
§Transactions, locks, and timeouts
At the beginning of this operation, a new transaction is started (otherwise if already in a transaction a savepoint is created). Before returning, this transaction is committed. It’s important to point out that the row locked by this transaction will be locked until the transaction is committed, rolled back, or is otherwise reset, e.g. via a timeout.
More specifically the implication is that transactions should set a reasonable timeout to ensure tasks are returned to the queue on a reasonable time horizon. Note that Underway does not currently set any database-level timeouts.
§Errors
This function will return an error if:
- The database operation fails during select.
- The database operation fails during update.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
let task = { /* An implementer of `Task`. */ };
// Enqueue a new task.
queue.enqueue(&pool, &task, &()).await?;
// Dequeue the enqueued task.
let mut tx = pool.begin().await?;
let pending_task = queue
.dequeue()
.await?
.expect("There should be a pending task.");
Sourcepub async fn schedule<'a, E>(
&self,
executor: E,
zoned_schedule: &ZonedSchedule,
input: &T::Input,
) -> Result<(), Error>where
E: PgExecutor<'a>,
pub async fn schedule<'a, E>(
&self,
executor: E,
zoned_schedule: &ZonedSchedule,
input: &T::Input,
) -> Result<(), Error>where
E: PgExecutor<'a>,
Creates a schedule for the queue.
Schedules are useful when a task should be run periodically, according to a crontab definition.
Note: After a schedule has been set, a scheduler instance must be run in order for schedules to fire.
§Errors
This function will return an error if:
- The input value cannot be serialized.
- The database operation fails during insert.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
// Set a schedule on the queue with the given input.
let daily = "@daily[America/Los_Angeles]".parse()?;
queue.schedule(&pool, &daily, &()).await?;
Sourcepub async fn unschedule<'a, E>(&self, executor: E) -> Result<(), Error>where
E: PgExecutor<'a>,
pub async fn unschedule<'a, E>(&self, executor: E) -> Result<(), Error>where
E: PgExecutor<'a>,
Removes the configured schedule for the queue, if one exsists..
§Errors
This function will return an error if:
- The database operation fails during insert.
§Example
let pool = { /* A `PgPool`. */ };
let queue = { /* A `Queue`. */ };
// Unset the schedule if one was set.
queue.unschedule(&pool).await?;
Trait Implementations§
Auto Trait Implementations§
impl<T> Freeze for Queue<T>
impl<T> !RefUnwindSafe for Queue<T>
impl<T> Send for Queue<T>
impl<T> Sync for Queue<T>where
T: Sync,
impl<T> Unpin for Queue<T>where
T: Unpin,
impl<T> !UnwindSafe for Queue<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more