pub trait QueueStorage:
ResultStorage
+ Send
+ Sync {
// Required methods
fn enqueue<'life0, 'async_trait>(
&'life0 self,
activity: QueuedActivity,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn dequeue<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
worker_id: &'life1 str,
timeout: Duration,
activity_types: Option<&'life2 [String]>,
) -> Pin<Box<dyn Future<Output = Result<Option<QueuedActivity>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn ack_success<'life0, 'life1, 'async_trait>(
&'life0 self,
activity_id: Uuid,
result: Option<Value>,
worker_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn ack_failure<'life0, 'life1, 'async_trait>(
&'life0 self,
activity_id: Uuid,
failure: FailureKind,
worker_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn process_scheduled<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn requeue_expired<'life0, 'async_trait>(
&'life0 self,
batch_size: usize,
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn extend_lease<'life0, 'async_trait>(
&'life0 self,
activity_id: Uuid,
extend_by: Duration,
) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn store_result<'life0, 'async_trait>(
&'life0 self,
activity_id: Uuid,
result: ActivityResult,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn check_idempotency<'life0, 'life1, 'async_trait>(
&'life0 self,
activity: &'life1 QueuedActivity,
) -> Pin<Box<dyn Future<Output = Result<Option<Uuid>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
// Provided method
fn schedules_natively(&self) -> bool { ... }
}Expand description
Core trait for queue operations.
This trait defines the essential operations that any queue backend must implement to work with RunnerQ’s worker engine.
§Implementation Notes
- All methods are async and should be implemented efficiently
- Implementations should handle their own connection pooling
- Errors should be mapped to
StorageErrorvariants - The
lease_idfromdequeuemust be passed to ack methods
Required Methods§
Sourcefn enqueue<'life0, 'async_trait>(
&'life0 self,
activity: QueuedActivity,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn enqueue<'life0, 'async_trait>(
&'life0 self,
activity: QueuedActivity,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Enqueue an activity for immediate or scheduled execution.
If scheduled_at is Some, the activity should be stored for later
execution at that time. Otherwise, it should be immediately available
for dequeue.
Sourcefn dequeue<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
worker_id: &'life1 str,
timeout: Duration,
activity_types: Option<&'life2 [String]>,
) -> Pin<Box<dyn Future<Output = Result<Option<QueuedActivity>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn dequeue<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
worker_id: &'life1 str,
timeout: Duration,
activity_types: Option<&'life2 [String]>,
) -> Pin<Box<dyn Future<Output = Result<Option<QueuedActivity>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Try to dequeue one activity for processing.
Returns None if no activities are available within the timeout.
The returned DequeuedActivity includes a lease_id that must be
passed to ack_success or ack_failure.
When activity_types is Some, only activities whose activity_type
matches one of the listed values are eligible. When None, all types
are eligible.
Sourcefn ack_success<'life0, 'life1, 'async_trait>(
&'life0 self,
activity_id: Uuid,
result: Option<Value>,
worker_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn ack_success<'life0, 'life1, 'async_trait>(
&'life0 self,
activity_id: Uuid,
result: Option<Value>,
worker_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Mark a dequeued activity as successfully completed.
Sourcefn ack_failure<'life0, 'life1, 'async_trait>(
&'life0 self,
activity_id: Uuid,
failure: FailureKind,
worker_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn ack_failure<'life0, 'life1, 'async_trait>(
&'life0 self,
activity_id: Uuid,
failure: FailureKind,
worker_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Mark a dequeued activity as failed.
The backend should handle retry logic based on the FailureKind:
Retryable: Schedule for retry if attempts remain, else move to DLQNonRetryable: Mark as failed immediatelyTimeout: Treat as retryable
Returns true if the activity was moved to the dead letter queue.
Sourcefn process_scheduled<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn process_scheduled<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Process scheduled activities that are ready to run.
Should move activities whose scheduled_at time has passed to the
ready queue. Returns the number of activities processed.
Backends that handle scheduling natively in dequeue()
(i.e. schedules_natively() returns true)
should return Ok(0) here. The worker engine will skip the polling loop
entirely for such backends.
Sourcefn requeue_expired<'life0, 'async_trait>(
&'life0 self,
batch_size: usize,
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn requeue_expired<'life0, 'async_trait>(
&'life0 self,
batch_size: usize,
) -> Pin<Box<dyn Future<Output = Result<u64, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Requeue expired leases back to the ready queue.
This is the “reaper” function that handles activities whose workers crashed or timed out. Returns the number of activities requeued.
Sourcefn extend_lease<'life0, 'async_trait>(
&'life0 self,
activity_id: Uuid,
extend_by: Duration,
) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn extend_lease<'life0, 'async_trait>(
&'life0 self,
activity_id: Uuid,
extend_by: Duration,
) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Extend the lease for an activity currently being processed.
Returns true if the lease was extended, false if not found.
Sourcefn store_result<'life0, 'async_trait>(
&'life0 self,
activity_id: Uuid,
result: ActivityResult,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn store_result<'life0, 'async_trait>(
&'life0 self,
activity_id: Uuid,
result: ActivityResult,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Store the result of a completed activity.
Sourcefn check_idempotency<'life0, 'life1, 'async_trait>(
&'life0 self,
activity: &'life1 QueuedActivity,
) -> Pin<Box<dyn Future<Output = Result<Option<Uuid>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn check_idempotency<'life0, 'life1, 'async_trait>(
&'life0 self,
activity: &'life1 QueuedActivity,
) -> Pin<Box<dyn Future<Output = Result<Option<Uuid>, StorageError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Evaluate idempotency rules before enqueueing.
Returns:
Ok(None): Proceed with enqueueOk(Some(id)): Return existing activity (forReturnExistingbehavior)Err(...): Conflict or error
Provided Methods§
Sourcefn schedules_natively(&self) -> bool
fn schedules_natively(&self) -> bool
Whether this backend handles scheduled activities natively in dequeue().
When true, the worker engine will not start the scheduled-activities
processor loop, since dequeue() already picks up due scheduled and
retrying activities directly.
Defaults to false (the processor loop is started).