Skip to main content

RedisBroker

Struct RedisBroker 

Source
pub struct RedisBroker { /* private fields */ }
Expand description

Redis-based broker implementation

Implementations§

Source§

impl RedisBroker

Source

pub fn new( redis_url: &str, queue_name: &str, ) -> Result<RedisBroker, CelersError>

Create a new Redis broker with FIFO mode

Source

pub fn with_mode( redis_url: &str, queue_name: &str, mode: QueueMode, ) -> Result<RedisBroker, CelersError>

Create a new Redis broker with specified queue mode

Source

pub fn from_config( config: &RedisConfig, queue_name: &str, mode: QueueMode, ) -> Result<RedisBroker, CelersError>

Create a new Redis broker from a RedisConfig

Source

pub async fn from_sentinel( sentinel: &SentinelClient, queue_name: &str, mode: QueueMode, ) -> Result<RedisBroker, CelersError>

Create a new Redis broker from a SentinelClient (for high availability)

Source

pub fn with_visibility_timeout(self, timeout_secs: u64) -> RedisBroker

Set visibility timeout (default: 300 seconds)

Source

pub fn mode(&self) -> QueueMode

Get the queue mode

Source

pub async fn dlq_size(&self) -> Result<usize, CelersError>

Get the number of tasks in the Dead Letter Queue

Source

pub async fn inspect_dlq( &self, limit: isize, ) -> Result<Vec<SerializedTask>, CelersError>

Inspect tasks in the Dead Letter Queue

Source

pub async fn replay_from_dlq(&self, task_id: &Uuid) -> Result<bool, CelersError>

Replay a task from the Dead Letter Queue back to the main queue

Source

pub async fn clear_dlq(&self) -> Result<usize, CelersError>

Clear all tasks from the Dead Letter Queue

Source

pub fn cancel_channel(&self) -> &str

Get the cancellation channel name (for workers to subscribe)

Source

pub async fn create_pubsub(&self) -> Result<PubSub, CelersError>

Create a PubSub connection for listening to cancellation messages

Source

pub fn health_checker(&self) -> HealthChecker

Create a health checker for monitoring Redis status

Source

pub fn queue_controller(&self) -> QueueController

Create a queue controller for pause/resume operations

Source

pub fn deduplicator(&self) -> Deduplicator

Create a task deduplicator

Source

pub fn script_manager(&self) -> ScriptManager

Create a script manager for Lua script optimization

Source

pub fn partition_manager( &self, num_partitions: usize, strategy: PartitionStrategy, ) -> PartitionManager

Create a partition manager for distributed queues

Source

pub fn batch_operations(&self) -> BatchOperations

Create a batch operations handler for advanced batch processing

Source

pub fn priority_manager(&self) -> PriorityManager

Create a priority manager for dynamic priority adjustments

Source

pub fn task_query(&self) -> TaskQuery

Create a task query interface for inspecting tasks

Source

pub fn backup_manager(&self) -> BackupManager

Create a backup manager for queue backup and restore

Source

pub async fn set_queue_ttl(&self, ttl_secs: u64) -> Result<(), CelersError>

Set TTL (time-to-live) for tasks in all queues

This helps prevent unbounded queue growth by automatically expiring old tasks. TTL is set in seconds.

Source

pub async fn cleanup_dlq( &self, _max_age_secs: u64, ) -> Result<usize, CelersError>

Clean up old tasks from DLQ (older than specified age in seconds)

Source

pub async fn get_queue_stats(&self) -> Result<QueueStats, CelersError>

Get queue statistics (pending, processing, DLQ, delayed counts)

Source

pub async fn check_health(&self) -> RedisHealthStatus

Check Redis health status

Source

pub async fn ping(&self) -> Result<u64, CelersError>

Ping Redis and return latency in milliseconds

Source

pub fn visibility_timeout(&self) -> u64

Get the visibility timeout in seconds

Source

pub fn client(&self) -> &Client

Get the Redis client (for advanced operations)

Source

pub fn delayed_queue_name(&self) -> &str

Get the delayed queue name

Source

pub fn processing_queue_name(&self) -> &str

Get the processing queue name

Source

pub fn dlq_name(&self) -> &str

Get the DLQ name

Source

pub fn queue_name(&self) -> &str

Get the main queue name

Source

pub fn queue_names(&self) -> Vec<String>

Get all queue names managed by this broker

Source

pub async fn queue_exists(&self, queue_name: &str) -> Result<bool, CelersError>

Check if a specific queue exists and has tasks

Source

pub async fn get_queue_size_by_name( &self, queue_name: &str, ) -> Result<usize, CelersError>

Get the size of a specific queue by name

Source

pub async fn delete_queue(&self, queue_name: &str) -> Result<(), CelersError>

Delete a specific queue (use with caution!)

Source

pub async fn purge_all_queues(&self) -> Result<usize, CelersError>

Purge all queues (main, processing, DLQ, delayed)

Source

pub async fn get_all_queue_sizes( &self, ) -> Result<HashMap<String, usize>, CelersError>

Get a summary of all queue sizes

Source

pub async fn recover_processing_tasks(&self) -> Result<usize, CelersError>

Move all tasks from processing queue back to main queue

Useful for recovering tasks that were being processed when a worker crashed. Returns the number of tasks moved.

Source

pub async fn total_task_count(&self) -> Result<usize, CelersError>

Get the total number of tasks across all queues (main, processing, DLQ, delayed)

Source

pub async fn is_idle(&self) -> Result<bool, CelersError>

Check if the broker is idle (all queues empty)

Source

pub async fn estimate_memory_usage(&self) -> Result<usize, CelersError>

Estimate memory usage of tasks in all queues (in bytes)

This is an approximation based on serialized task sizes.

Source

pub async fn bulk_replay_from_dlq( &self, max_count: Option<usize>, ) -> Result<usize, CelersError>

Move tasks from DLQ back to main queue in bulk

Returns the number of tasks moved.

Source

pub async fn peek_next(&self) -> Result<Option<SerializedTask>, CelersError>

Peek at the next task without dequeueing it

Useful for inspecting what will be processed next without removing it from the queue.

Source

pub async fn queue_depth_percentage( &self, max_recommended_size: Option<usize>, ) -> Result<f64, CelersError>

Get queue depth percentage (current size / max recommended size)

Returns a value between 0.0 and 1.0+ where > 0.8 suggests the queue is getting full. max_recommended_size defaults to 10000 if not specified.

Source

pub async fn is_near_capacity( &self, max_recommended_size: Option<usize>, ) -> Result<bool, CelersError>

Check if the queue is approaching capacity (> 80% of recommended max)

Trait Implementations§

Source§

impl Broker for RedisBroker

Source§

fn enqueue<'life0, 'async_trait>( &'life0 self, task: SerializedTask, ) -> Pin<Box<dyn Future<Output = Result<Uuid, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, RedisBroker: 'async_trait,

Enqueue a task to the broker
Source§

fn dequeue<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Option<BrokerMessage>, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, RedisBroker: 'async_trait,

Dequeue a task from the broker (blocking/waiting)
Source§

fn ack<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, task_id: &'life1 Uuid, receipt_handle: Option<&'life2 str>, ) -> Pin<Box<dyn Future<Output = Result<(), CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, RedisBroker: 'async_trait,

Acknowledge successful processing of a task
Source§

fn reject<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, task_id: &'life1 Uuid, receipt_handle: Option<&'life2 str>, requeue: bool, ) -> Pin<Box<dyn Future<Output = Result<(), CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, RedisBroker: 'async_trait,

Reject a task and potentially requeue it
Source§

fn queue_size<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<usize, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, RedisBroker: 'async_trait,

Get the current queue size
Source§

fn cancel<'life0, 'life1, 'async_trait>( &'life0 self, task_id: &'life1 Uuid, ) -> Pin<Box<dyn Future<Output = Result<bool, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, RedisBroker: 'async_trait,

Cancel a pending task
Source§

fn enqueue_batch<'life0, 'async_trait>( &'life0 self, tasks: Vec<SerializedTask>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Uuid>, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, RedisBroker: 'async_trait,

Enqueue multiple tasks in a single operation (batch) Read more
Source§

fn dequeue_batch<'life0, 'async_trait>( &'life0 self, count: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<BrokerMessage>, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, RedisBroker: 'async_trait,

Dequeue multiple tasks in a single operation (batch) Read more
Source§

fn ack_batch<'life0, 'life1, 'async_trait>( &'life0 self, tasks: &'life1 [(Uuid, Option<String>)], ) -> Pin<Box<dyn Future<Output = Result<(), CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, RedisBroker: 'async_trait,

Acknowledge multiple tasks in a single operation (batch) Read more
Source§

fn enqueue_at<'life0, 'async_trait>( &'life0 self, task: SerializedTask, execute_at: i64, ) -> Pin<Box<dyn Future<Output = Result<Uuid, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, RedisBroker: 'async_trait,

Schedule a task for execution at a specific Unix timestamp (seconds) Read more
Source§

fn enqueue_after<'life0, 'async_trait>( &'life0 self, task: SerializedTask, delay_secs: u64, ) -> Pin<Box<dyn Future<Output = Result<Uuid, CelersError>> + Send + 'async_trait>>
where 'life0: 'async_trait, RedisBroker: 'async_trait,

Schedule a task for execution after a delay (seconds) Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more