redis_work_queue/lib.rs
1//! A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js
2//! (TypeScript) and Dotnet (C#).
3//!
4//! This is the Rust implementations. For an overview of how the work queue works, it's
5//! limitations, and the general concepts and implementations in other languages, please read the
6//! [redis-work-queue readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md).
7//!
8//! ## Setup
9//!
10//! ```rust
11//! # async fn example() -> redis::RedisResult<()> {
12//! use redis_work_queue::{Item, KeyPrefix, WorkQueue};
13//!
14//! let host = "your-redis-server";
15//! let db = &mut redis::Client::open(format!("redis://{host}/"))?
16//! .get_async_connection()
17//! .await?;
18//!
19//! let work_queue = WorkQueue::new(KeyPrefix::from("example_work_queue"));
20//! # Ok(())
21//! # }
22//! ```
23//!
24//! ## Adding work
25//!
26//! ### Creating `Item`s
27//!
28//! ```rust
29//! use redis_work_queue::Item;
30//!
31//! // Create an item from `Box<[u8]>`
32//! let box_item = Item::new(Box::new(*b"[1,2,3]"));
33//!
34//! // Create an item from a `String`
35//! let string_item = Item::from_string_data("[1,2,3]".to_string());
36//!
37//! // Create an item from a serializable type
38//! let json_item = Item::from_json_data(&[1, 2, 3]).unwrap();
39//!
40//! assert_eq!(box_item.data, string_item.data);
41//! assert_eq!(box_item.data, json_item.data);
42//!
43//! // Parse an Item's data as json:
44//! assert_eq!(box_item.data_json::<Vec<u32>>().unwrap(), vec![1, 2, 3]);
45//! ```
46//!
47//! ### Add an item to a work queue
48//! ```rust
49//! # use redis::{AsyncCommands, RedisResult};
50//! # use redis_work_queue::{Item, KeyPrefix, WorkQueue};
51//! # async fn add_item<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue, item: Item) -> redis::RedisResult<()> {
52//! work_queue.add_item(db, &item).await.expect("failed to add item to work queue");
53//! # Ok(())
54//! # }
55//! ```
56//!
57//! ## Completing work
58//!
59//! Please read [the documentation on leasing and completing
60//! items](https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item).
61//!
62//! ```rust
63//! use std::time::Duration;
64//!
65//! use redis::{AsyncCommands, RedisResult};
66//! use redis_work_queue::{Item, WorkQueue};
67//!
68//! # fn do_some_work(_: &Item) {}
69//! pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
70//! loop {
71//! // Wait for a job with no timeout and a lease time of 5 seconds.
72//! let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
73//! do_some_work(&job);
74//! work_queue.complete(db, &job);
75//! }
76//! }
77//! ```
78//!
79//! ### Handling errors
80//!
81//! Please read [the documentation on handling
82//! errors](https://github.com/MeVitae/redis-work-queue/blob/main/README.md#handling-errors).
83//!
84//! ```rust
85//! use std::time::Duration;
86//!
87//! use redis::{AsyncCommands, RedisResult};
88//! use redis_work_queue::{Item, WorkQueue};
89//!
90//! # struct ExampleError {
91//! # should_retry: bool,
92//! # }
93//! # impl ExampleError {
94//! # fn should_retry(&self) -> bool {
95//! # self.should_retry
96//! # }
97//! # }
98//! # fn do_some_work(_: &Item) -> Result<(), ExampleError> { Ok(()) }
99//! # fn log_error(_: ExampleError) {}
100//! pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
101//! loop {
102//! // Wait for a job with no timeout and a lease time of 5 seconds.
103//! let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
104//! match do_some_work(&job) {
105//! // Mark successful jobs as complete
106//! Ok(()) => {
107//! work_queue.complete(db, &job).await?;
108//! }
109//! // Drop a job that should be retried - it will be returned to the work queue after
110//! // the (5 second) lease expires.
111//! Err(err) if err.should_retry() => (),
112//! // Errors that shouldn't cause a retry should mark the job as complete so it isn't
113//! // tried again.
114//! Err(err) => {
115//! log_error(err);
116//! work_queue.complete(db, &job).await?;
117//! }
118//! }
119//! }
120//! }
121//! ```
122
123use std::future::Future;
124use std::time::Duration;
125
126use redis::{AsyncCommands, RedisResult};
127use serde::{Deserialize, Serialize};
128use uuid::Uuid;
129
130/// A string which should be prefixed to an identifier to generate a database key.
131///
132/// ### Example
133///
134/// ```rust
135/// use redis_work_queue::KeyPrefix;
136///
137/// let cv_key = KeyPrefix::from("cv:");
138/// // ...
139/// let cv_id = "abcdef-123456";
140/// assert_eq!(cv_key.of(cv_id), "cv:abcdef-123456");
141/// // let cv_info = db.get(cv_key.of(cv_id));
142/// ```
143#[derive(Clone, PartialEq, Eq, Debug)]
144pub struct KeyPrefix {
145 prefix: String,
146}
147
148impl KeyPrefix {
149 pub fn new(prefix: String) -> KeyPrefix {
150 KeyPrefix { prefix }
151 }
152
153 /// Returns the result of prefixing `self` onto `name`.
154 pub fn of(&self, name: &str) -> String {
155 let mut key = String::with_capacity(self.prefix.len() + name.len());
156 key.push_str(&self.prefix);
157 key.push_str(name);
158 key
159 }
160
161 /// Returns the result of prefixing `self` onto `other` as a new `KeyPrefix`.
162 ///
163 /// This is like [`KeyPrefix::concat`] except it only borrows `self`.
164 pub fn and(&self, other: &str) -> KeyPrefix {
165 KeyPrefix::new(self.of(other))
166 }
167
168 /// Returns the result of prefixing `self` onto `other` as a new `KeyPrefix`.
169 ///
170 /// This is like [`KeyPrefix::and`] except it moves `self`.
171 pub fn concat(mut self, other: &str) -> KeyPrefix {
172 self.prefix.push_str(other);
173 self
174 }
175}
176
177impl From<String> for KeyPrefix {
178 fn from(prefix: String) -> KeyPrefix {
179 KeyPrefix::new(prefix)
180 }
181}
182
183impl From<&str> for KeyPrefix {
184 fn from(prefix: &str) -> KeyPrefix {
185 KeyPrefix::new(prefix.to_string())
186 }
187}
188
189impl From<KeyPrefix> for String {
190 fn from(key_prefix: KeyPrefix) -> String {
191 key_prefix.prefix
192 }
193}
194
195impl AsRef<str> for KeyPrefix {
196 fn as_ref(&self) -> &str {
197 &self.prefix
198 }
199}
200
201/// An item for a work queue. Each item has an ID and associated data.
202#[derive(Clone, Debug)]
203pub struct Item {
204 pub id: String,
205 pub data: Box<[u8]>,
206}
207
208impl Item {
209 /// Create a new item, with the provided `data` and a random id (a uuid).
210 pub fn new(data: Box<[u8]>) -> Item {
211 Item {
212 data,
213 id: Uuid::new_v4().to_string(),
214 }
215 }
216
217 /// Create a new item, with the provided `data` and a random id (a uuid).
218 ///
219 /// The item's data is the output of `data.into_bytes().into_boxed_slice()`.
220 pub fn from_string_data(data: String) -> Item {
221 Item::new(data.into_bytes().into_boxed_slice())
222 }
223
224 /// Create a new item with a random id (a uuid). The data is the result of
225 /// `serde_json::to_vec(data)`.
226 pub fn from_json_data<T: Serialize>(data: &T) -> serde_json::Result<Item> {
227 Ok(Item::new(serde_json::to_vec(data)?.into()))
228 }
229
230 /// Returns the data, parsed as JSON.
231 pub fn data_json<'a, T: Deserialize<'a>>(&'a self) -> serde_json::Result<T> {
232 serde_json::from_slice(&self.data)
233 }
234
235 /// Returns the data, parsed as JSON, with a static lifetime.
236 pub fn data_json_static<T: for<'de> Deserialize<'de>>(&self) -> serde_json::Result<T> {
237 serde_json::from_slice(&self.data)
238 }
239}
240
241/// A work queue backed by a redis database
242pub struct WorkQueue {
243 /// A unique ID for this instance
244 session: String,
245 /// The key for the list of items in the queue
246 main_queue_key: String,
247 /// The key for the list of items being processed
248 processing_key: String,
249 // TODO: Implement cleaning in the Rust library?
250 //cleaning_key: String,
251 /// The key prefix for lease entries
252 lease_key: KeyPrefix,
253 /// The key for item data entries
254 item_data_key: KeyPrefix,
255}
256
257impl WorkQueue {
258 pub fn new(name: KeyPrefix) -> WorkQueue {
259 WorkQueue {
260 session: Uuid::new_v4().to_string(),
261 main_queue_key: name.of(":queue"),
262 processing_key: name.of(":processing"),
263 //cleaning_key: name.of(":cleaning"),
264 lease_key: name.and(":lease:"),
265 item_data_key: name.and(":item:"),
266 }
267 }
268
269 /// Add an item to the work queue.
270 ///
271 /// If an item with the same ID already exists, this item is not added, and `false` is returned. Otherwise, if the item is added `true` is returned.
272 ///
273 /// If you know the item ID is unique, and not already in the queue, use the optimised
274 /// [`WorkQueue::add_unique_item`] instead.
275 pub async fn add_item<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
276 let added = db
277 .set_nx(self.item_data_key.of(&item.id), item.data.as_ref())
278 .await?;
279 if added {
280 db.lpush(&self.main_queue_key, &item.id).await?;
281 }
282 Ok(added)
283 }
284
285 /// Add an item, which is known to have an ID not already in the queue, to the work queue. This
286 /// adds the redis commands onto the pipeline passed.
287 ///
288 /// Use [`WorkQueue::add_unique_item`] if you don't want to pass a pipeline directly.
289 pub fn add_unique_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) {
290 // Add the item data
291 // NOTE: it's important that the data is added first, otherwise someone could pop the item
292 // before the data is ready
293 pipeline.set(self.item_data_key.of(&item.id), item.data.as_ref());
294 // Then add the id to the work queue
295 pipeline.lpush(&self.main_queue_key, &item.id);
296 }
297
298 /// Add an item, which is known to have an ID not already in the queue, to the work queue.
299 ///
300 /// This creates a pipeline and executes it on the database.
301 pub async fn add_unique_item<C: AsyncCommands>(
302 &self,
303 db: &mut C,
304 item: &Item,
305 ) -> RedisResult<()> {
306 let mut pipeline = Box::new(redis::pipe());
307 self.add_unique_item_to_pipeline(&mut pipeline, item);
308 pipeline.query_async(db).await
309 }
310
311 /// Return the length of the work queue (not including items being processed, see
312 /// [`WorkQueue::processing`]).
313 pub fn queue_len<'a, C: AsyncCommands>(
314 &'a self,
315 db: &'a mut C,
316 ) -> impl Future<Output = RedisResult<usize>> + 'a {
317 db.llen(&self.main_queue_key)
318 }
319
320 /// Return the number of items being processed.
321 pub fn processing<'a, C: AsyncCommands>(
322 &'a self,
323 db: &'a mut C,
324 ) -> impl Future<Output = RedisResult<usize>> + 'a {
325 db.llen(&self.processing_key)
326 }
327
328 /// Request a work lease the work queue. This should be called by a worker to get work to
329 /// complete. When completed, the `complete` method should be called.
330 ///
331 /// The function will return either when a job is leased or after `timeout` if `timeout`
332 /// isn't `None`.
333 ///
334 /// If the job is not completed (by calling [`WorkQueue::complete`]) before the end of
335 /// `lease_duration`, another worker may pick up the same job. It is not a problem if a job is
336 /// marked as `done` more than once.
337 ///
338 /// If you've not already done it, it's worth reading [the documentation on leasing
339 /// items](https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item).
340 pub async fn lease<C: AsyncCommands>(
341 &self,
342 db: &mut C,
343 timeout: Option<Duration>,
344 lease_duration: Duration,
345 ) -> RedisResult<Option<Item>> {
346 loop {
347 // First, to get an item, we try to move an item from the main queue to the processing list.
348 let Some(item_id): Option<String> = (match timeout {
349 Some(Duration::ZERO) => {
350 db.rpoplpush(&self.main_queue_key, &self.processing_key)
351 .await?
352 }
353 _ => {
354 db.brpoplpush(
355 &self.main_queue_key,
356 &self.processing_key,
357 timeout.map(|d| d.as_secs() as f64).unwrap_or(0.),
358 )
359 .await?
360 }
361 }) else {
362 return Ok(None);
363 };
364
365 // If we got an item, fetch the associated data.
366 let item_data: Vec<u8> = match db.get(self.item_data_key.of(&item_id)).await? {
367 Some(item_data) => item_data,
368 // If the item doesn't actually exist, and there's no timeout, just try again.
369 None if timeout == None => continue,
370 // If there was a timeout, we return early.
371 None => return Ok(None),
372 };
373
374 // Now setup the lease item.
375 // NOTE: Racing for a lease is ok
376 db.set_ex(
377 self.lease_key.of(&item_id),
378 &self.session,
379 lease_duration.as_secs(),
380 )
381 .await?;
382
383 return Ok(Some(Item {
384 data: item_data.into_boxed_slice(),
385 id: item_id,
386 }));
387 }
388 }
389
390 /// Marks a job as completed and remove it from the work queue. After `complete` has been called
391 /// (and returns `true`), no workers will receive this job again.
392 ///
393 /// `complete` returns a boolean indicating if *the job has been removed* **and** *this worker
394 /// was the first worker to call `complete`*. So, while lease might give the same job to
395 /// multiple workers, complete will return `true` for only one worker.
396 pub async fn complete<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
397 // If we did actually remove it, delete the item data and lease.
398 // If we didn't really remove it, it's probably been returned to the work queue so the
399 // data is still needed and the lease might not be ours (if it is still ours, it'll
400 // expire anyway).
401 let (items_deleted, (), ()): (usize, (), ()) = redis::pipe()
402 .del(self.item_data_key.of(&item.id))
403 .lrem(&self.processing_key, 0, &item.id)
404 .del(self.lease_key.of(&item.id))
405 .query_async(db)
406 .await?;
407 Ok(items_deleted > 0)
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::{Item, KeyPrefix};
414
415 use serde::{Deserialize, Serialize};
416
417 #[test]
418 fn test_key_prefix() {
419 let prefix = KeyPrefix::new("abc".to_string());
420 let another_prefix = prefix.and("123");
421 let final_prefix = KeyPrefix::new("abc123".to_string());
422 assert_eq!(another_prefix, final_prefix);
423 assert_ne!(prefix, another_prefix);
424 assert_eq!(another_prefix.as_ref(), final_prefix.as_ref());
425 assert_eq!(prefix.as_ref(), "abc");
426 assert_eq!(prefix.of("bar"), "abcbar");
427 assert_eq!(
428 Into::<String>::into(prefix.and("foo")),
429 "abcfoo".to_string()
430 );
431 assert_eq!(prefix.of("foo"), "abcfoo".to_string());
432 assert_eq!(prefix.and("foo").of("bar"), "abcfoobar".to_string());
433 }
434
435 #[test]
436 fn test_item_json() {
437 #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
438 struct Test {
439 #[serde(default)]
440 n: usize,
441 s: String,
442 }
443
444 let test_foo = Test {
445 n: 7,
446 s: "foo".to_string(),
447 };
448 let test_bar = Test {
449 n: 8,
450 s: "bar".to_string(),
451 };
452 let test_baz = Test {
453 n: 0,
454 s: "baz".to_string(),
455 };
456
457 assert_eq!(
458 test_foo,
459 Item::from_json_data(&test_foo)
460 .unwrap()
461 .data_json()
462 .unwrap()
463 );
464
465 let test_item_bar = Item::from_json_data(&test_bar).unwrap();
466 assert_eq!(
467 test_item_bar.id.len(),
468 "00112233-4455-6677-8899-aabbccddeeff".len()
469 );
470 let test_item_baz = Item::new(
471 "{\"s\":\"baz\"}"
472 .to_string()
473 .into_bytes()
474 .into_boxed_slice(),
475 );
476 assert_eq!(
477 test_item_baz.id.len(),
478 "00112233-4455-6677-8899-aabbccddeeff".len()
479 );
480 assert_ne!(test_item_bar.id, test_item_baz.id);
481 assert_ne!(test_item_bar.data, test_item_baz.data);
482 assert_ne!(
483 test_item_bar.data_json::<Test>().unwrap(),
484 test_item_baz.data_json().unwrap()
485 );
486 assert_eq!(
487 test_item_bar.data_json::<Test>().unwrap(),
488 test_item_bar.data_json().unwrap()
489 );
490 assert_eq!(test_item_bar.data_json::<Test>().unwrap(), test_bar);
491 assert_eq!(test_item_baz.data_json::<Test>().unwrap(), test_baz);
492 }
493}