Crate redis_work_queue
source ·Expand description
A work queue, on top of a redis database, with implementations in Python, Rust, Go and C#.
This is the Rust implementations. For an overview of how the work queue works, it’s limitations, and the general concepts and implementations in other languages, please read the redis-work-queue readme.
Setup
use redis_work_queue::{Item, KeyPrefix, WorkQueue};
let host = "your-redis-server";
let db = &mut redis::Client::open(format!("redis://{host}/"))?
    .get_async_connection()
    .await?;
let work_queue = WorkQueue::new(KeyPrefix::from("example_work_queue"));Adding work
Creating Items
use redis_work_queue::Item;
// Create an item from `Box<[u8]>`
let box_item = Item::new(Box::new(*b"[1,2,3]"));
// Create an item from a `String`
let string_item = Item::from_string_data("[1,2,3]".to_string());
// Create an item from a serializable type
let json_item = Item::from_json_data(&[1, 2, 3]).unwrap();
assert_eq!(box_item.data, string_item.data);
assert_eq!(box_item.data, json_item.data);Add an item to a work queue
work_queue.add_item(db, &item).await.expect("failed to add item to work queue");Completing work
Please read the documentation on leasing and completing items.
use std::time::Duration;
use redis::{AsyncCommands, RedisResult};
use redis_work_queue::{Item, WorkQueue};
pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
    loop {
        // Wait for a job with no timeout and a lease time of 5 seconds.
        let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
        do_some_work(&job);
        work_queue.complete(db, &job);
    }
}Handling errors
Please read the documentation on handling errors.
use std::time::Duration;
use redis::{AsyncCommands, RedisResult};
use redis_work_queue::{Item, WorkQueue};
pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
    loop {
        // Wait for a job with no timeout and a lease time of 5 seconds.
        let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
        match do_some_work(&job) {
            // Mark successful jobs as complete
            Ok(()) => {
                work_queue.complete(db, &job).await?;
            }
            // Drop a job that should be retried - it will be returned to the work queue after
            // the (5 second) lease expires.
            Err(err) if err.should_retry() => (),
            // Errors that shouldn't cause a retry should mark the job as complete so it isn't
            // tried again.
            Err(err) => {
                log_error(err);
                work_queue.complete(db, &job).await?;
            }
        }
    }
}Structs
- An item for a work queue. Each item has an ID and associated data.
 - A string which should be prefixed to an identifier to generate a database key.
 - A work queue backed by a redis database