Crate redis_work_queue
source ·Expand description
A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#).
This is the Rust implementations. For an overview of how the work queue works, it’s limitations, and the general concepts and implementations in other languages, please read the redis-work-queue readme.
§Setup
use redis_work_queue::{Item, KeyPrefix, WorkQueue};
let host = "your-redis-server";
let db = &mut redis::Client::open(format!("redis://{host}/"))?
.get_async_connection()
.await?;
let work_queue = WorkQueue::new(KeyPrefix::from("example_work_queue"));
§Adding work
§Creating Item
s
use redis_work_queue::Item;
// Create an item from `Box<[u8]>`
let box_item = Item::new(Box::new(*b"[1,2,3]"));
// Create an item from a `String`
let string_item = Item::from_string_data("[1,2,3]".to_string());
// Create an item from a serializable type
let json_item = Item::from_json_data(&[1, 2, 3]).unwrap();
assert_eq!(box_item.data, string_item.data);
assert_eq!(box_item.data, json_item.data);
// Parse an Item's data as json:
assert_eq!(box_item.data_json::<Vec<u32>>().unwrap(), vec![1, 2, 3]);
§Add an item to a work queue
work_queue.add_item(db, &item).await.expect("failed to add item to work queue");
§Completing work
Please read the documentation on leasing and completing items.
use std::time::Duration;
use redis::{AsyncCommands, RedisResult};
use redis_work_queue::{Item, WorkQueue};
pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
loop {
// Wait for a job with no timeout and a lease time of 5 seconds.
let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
do_some_work(&job);
work_queue.complete(db, &job);
}
}
§Handling errors
Please read the documentation on handling errors.
use std::time::Duration;
use redis::{AsyncCommands, RedisResult};
use redis_work_queue::{Item, WorkQueue};
pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
loop {
// Wait for a job with no timeout and a lease time of 5 seconds.
let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
match do_some_work(&job) {
// Mark successful jobs as complete
Ok(()) => {
work_queue.complete(db, &job).await?;
}
// Drop a job that should be retried - it will be returned to the work queue after
// the (5 second) lease expires.
Err(err) if err.should_retry() => (),
// Errors that shouldn't cause a retry should mark the job as complete so it isn't
// tried again.
Err(err) => {
log_error(err);
work_queue.complete(db, &job).await?;
}
}
}
}
Structs§
- An item for a work queue. Each item has an ID and associated data.
- A string which should be prefixed to an identifier to generate a database key.
- A work queue backed by a redis database