Skip to main content

QueueStorage

Trait QueueStorage 

Source
pub trait QueueStorage:
    ResultStorage
    + Send
    + Sync {
    // Required methods
    fn enqueue<'life0, 'async_trait>(
        &'life0 self,
        activity: QueuedActivity,
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn dequeue<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        worker_id: &'life1 str,
        timeout: Duration,
        activity_types: Option<&'life2 [String]>,
    ) -> Pin<Box<dyn Future<Output = Result<Option<QueuedActivity>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn ack_success<'life0, 'life1, 'async_trait>(
        &'life0 self,
        activity_id: Uuid,
        result: Option<Value>,
        worker_id: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn ack_failure<'life0, 'life1, 'async_trait>(
        &'life0 self,
        activity_id: Uuid,
        failure: FailureKind,
        worker_id: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn process_scheduled<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn requeue_expired<'life0, 'async_trait>(
        &'life0 self,
        batch_size: usize,
    ) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn extend_lease<'life0, 'async_trait>(
        &'life0 self,
        activity_id: Uuid,
        extend_by: Duration,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn store_result<'life0, 'async_trait>(
        &'life0 self,
        activity_id: Uuid,
        result: ActivityResult,
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn check_idempotency<'life0, 'life1, 'async_trait>(
        &'life0 self,
        activity: &'life1 QueuedActivity,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Uuid>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;

    // Provided method
    fn schedules_natively(&self) -> bool { ... }
}
Expand description

Core trait for queue operations.

This trait defines the essential operations that any queue backend must implement to work with RunnerQ’s worker engine.

§Implementation Notes

  • All methods are async and should be implemented efficiently
  • Implementations should handle their own connection pooling
  • Errors should be mapped to StorageError variants
  • The lease_id from dequeue must be passed to ack methods

Required Methods§

Source

fn enqueue<'life0, 'async_trait>( &'life0 self, activity: QueuedActivity, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enqueue an activity for immediate or scheduled execution.

If scheduled_at is Some, the activity should be stored for later execution at that time. Otherwise, it should be immediately available for dequeue.

Source

fn dequeue<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, worker_id: &'life1 str, timeout: Duration, activity_types: Option<&'life2 [String]>, ) -> Pin<Box<dyn Future<Output = Result<Option<QueuedActivity>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Try to dequeue one activity for processing.

Returns None if no activities are available within the timeout. The returned DequeuedActivity includes a lease_id that must be passed to ack_success or ack_failure.

When activity_types is Some, only activities whose activity_type matches one of the listed values are eligible. When None, all types are eligible.

Source

fn ack_success<'life0, 'life1, 'async_trait>( &'life0 self, activity_id: Uuid, result: Option<Value>, worker_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Mark a dequeued activity as successfully completed.

Source

fn ack_failure<'life0, 'life1, 'async_trait>( &'life0 self, activity_id: Uuid, failure: FailureKind, worker_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Mark a dequeued activity as failed.

The backend should handle retry logic based on the FailureKind:

  • Retryable: Schedule for retry if attempts remain, else move to DLQ
  • NonRetryable: Mark as failed immediately
  • Timeout: Treat as retryable

Returns true if the activity was moved to the dead letter queue.

Source

fn process_scheduled<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Process scheduled activities that are ready to run.

Should move activities whose scheduled_at time has passed to the ready queue. Returns the number of activities processed.

Backends that handle scheduling natively in dequeue() (i.e. schedules_natively() returns true) should return Ok(0) here. The worker engine will skip the polling loop entirely for such backends.

Source

fn requeue_expired<'life0, 'async_trait>( &'life0 self, batch_size: usize, ) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Requeue expired leases back to the ready queue.

This is the “reaper” function that handles activities whose workers crashed or timed out. Returns the number of activities requeued.

Source

fn extend_lease<'life0, 'async_trait>( &'life0 self, activity_id: Uuid, extend_by: Duration, ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Extend the lease for an activity currently being processed.

Returns true if the lease was extended, false if not found.

Source

fn store_result<'life0, 'async_trait>( &'life0 self, activity_id: Uuid, result: ActivityResult, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Store the result of a completed activity.

Source

fn check_idempotency<'life0, 'life1, 'async_trait>( &'life0 self, activity: &'life1 QueuedActivity, ) -> Pin<Box<dyn Future<Output = Result<Option<Uuid>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Evaluate idempotency rules before enqueueing.

Returns:

  • Ok(None): Proceed with enqueue
  • Ok(Some(id)): Return existing activity (for ReturnExisting behavior)
  • Err(...): Conflict or error

Provided Methods§

Source

fn schedules_natively(&self) -> bool

Whether this backend handles scheduled activities natively in dequeue().

When true, the worker engine will not start the scheduled-activities processor loop, since dequeue() already picks up due scheduled and retrying activities directly.

Defaults to false (the processor loop is started).

Implementors§