Crate redis_work_queue

source ·
Expand description

A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#).

This is the Rust implementations. For an overview of how the work queue works, it’s limitations, and the general concepts and implementations in other languages, please read the redis-work-queue readme.

§Setup

use redis_work_queue::{Item, KeyPrefix, WorkQueue};

let host = "your-redis-server";
let db = &mut redis::Client::open(format!("redis://{host}/"))?
    .get_async_connection()
    .await?;

let work_queue = WorkQueue::new(KeyPrefix::from("example_work_queue"));

§Adding work

§Creating Items

use redis_work_queue::Item;

// Create an item from `Box<[u8]>`
let box_item = Item::new(Box::new(*b"[1,2,3]"));

// Create an item from a `String`
let string_item = Item::from_string_data("[1,2,3]".to_string());

// Create an item from a serializable type
let json_item = Item::from_json_data(&[1, 2, 3]).unwrap();

assert_eq!(box_item.data, string_item.data);
assert_eq!(box_item.data, json_item.data);

// Parse an Item's data as json:
assert_eq!(box_item.data_json::<Vec<u32>>().unwrap(), vec![1, 2, 3]);

§Add an item to a work queue

work_queue.add_item(db, &item).await.expect("failed to add item to work queue");

§Completing work

Please read the documentation on leasing and completing items.

use std::time::Duration;

use redis::{AsyncCommands, RedisResult};
use redis_work_queue::{Item, WorkQueue};

pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
    loop {
        // Wait for a job with no timeout and a lease time of 5 seconds.
        let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
        do_some_work(&job);
        work_queue.complete(db, &job);
    }
}

§Handling errors

Please read the documentation on handling errors.

use std::time::Duration;

use redis::{AsyncCommands, RedisResult};
use redis_work_queue::{Item, WorkQueue};

pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
    loop {
        // Wait for a job with no timeout and a lease time of 5 seconds.
        let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
        match do_some_work(&job) {
            // Mark successful jobs as complete
            Ok(()) => {
                work_queue.complete(db, &job).await?;
            }
            // Drop a job that should be retried - it will be returned to the work queue after
            // the (5 second) lease expires.
            Err(err) if err.should_retry() => (),
            // Errors that shouldn't cause a retry should mark the job as complete so it isn't
            // tried again.
            Err(err) => {
                log_error(err);
                work_queue.complete(db, &job).await?;
            }
        }
    }
}

Structs§

  • An item for a work queue. Each item has an ID and associated data.
  • A string which should be prefixed to an identifier to generate a database key.
  • A work queue backed by a redis database