redis_work_queue/
lib.rs

1//! A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js
2//! (TypeScript) and Dotnet (C#).
3//!
4//! This is the Rust implementations. For an overview of how the work queue works, it's
5//! limitations, and the general concepts and implementations in other languages, please read the
6//! [redis-work-queue readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md).
7//!
8//! ## Setup
9//!
10//! ```rust
11//! # async fn example() -> redis::RedisResult<()> {
12//! use redis_work_queue::{Item, KeyPrefix, WorkQueue};
13//!
14//! let host = "your-redis-server";
15//! let db = &mut redis::Client::open(format!("redis://{host}/"))?
16//!     .get_async_connection()
17//!     .await?;
18//!
19//! let work_queue = WorkQueue::new(KeyPrefix::from("example_work_queue"));
20//! # Ok(())
21//! # }
22//! ```
23//!
24//! ## Adding work
25//!
26//! ### Creating `Item`s
27//!
28//! ```rust
29//! use redis_work_queue::Item;
30//!
31//! // Create an item from `Box<[u8]>`
32//! let box_item = Item::new(Box::new(*b"[1,2,3]"));
33//!
34//! // Create an item from a `String`
35//! let string_item = Item::from_string_data("[1,2,3]".to_string());
36//!
37//! // Create an item from a serializable type
38//! let json_item = Item::from_json_data(&[1, 2, 3]).unwrap();
39//!
40//! assert_eq!(box_item.data, string_item.data);
41//! assert_eq!(box_item.data, json_item.data);
42//!
43//! // Parse an Item's data as json:
44//! assert_eq!(box_item.data_json::<Vec<u32>>().unwrap(), vec![1, 2, 3]);
45//! ```
46//!
47//! ### Add an item to a work queue
48//! ```rust
49//! # use redis::{AsyncCommands, RedisResult};
50//! # use redis_work_queue::{Item, KeyPrefix, WorkQueue};
51//! # async fn add_item<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue, item: Item) -> redis::RedisResult<()> {
52//! work_queue.add_item(db, &item).await.expect("failed to add item to work queue");
53//! # Ok(())
54//! # }
55//! ```
56//!
57//! ## Completing work
58//!
59//! Please read [the documentation on leasing and completing
60//! items](https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item).
61//!
62//! ```rust
63//! use std::time::Duration;
64//!
65//! use redis::{AsyncCommands, RedisResult};
66//! use redis_work_queue::{Item, WorkQueue};
67//!
68//! # fn do_some_work(_: &Item) {}
69//! pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
70//!     loop {
71//!         // Wait for a job with no timeout and a lease time of 5 seconds.
72//!         let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
73//!         do_some_work(&job);
74//!         work_queue.complete(db, &job);
75//!     }
76//! }
77//! ```
78//!
79//! ### Handling errors
80//!
81//! Please read [the documentation on handling
82//! errors](https://github.com/MeVitae/redis-work-queue/blob/main/README.md#handling-errors).
83//!
84//! ```rust
85//! use std::time::Duration;
86//!
87//! use redis::{AsyncCommands, RedisResult};
88//! use redis_work_queue::{Item, WorkQueue};
89//!
90//! # struct ExampleError {
91//! #     should_retry: bool,
92//! # }
93//! # impl ExampleError {
94//! #     fn should_retry(&self) -> bool {
95//! #         self.should_retry
96//! #     }
97//! # }
98//! # fn do_some_work(_: &Item) -> Result<(), ExampleError> { Ok(()) }
99//! # fn log_error(_: ExampleError) {}
100//! pub async fn work_loop<C: AsyncCommands>(db: &mut C, work_queue: WorkQueue) -> RedisResult<()> {
101//!     loop {
102//!         // Wait for a job with no timeout and a lease time of 5 seconds.
103//!         let job: Item = work_queue.lease(db, None, Duration::from_secs(5)).await?.unwrap();
104//!         match do_some_work(&job) {
105//!             // Mark successful jobs as complete
106//!             Ok(()) => {
107//!                 work_queue.complete(db, &job).await?;
108//!             }
109//!             // Drop a job that should be retried - it will be returned to the work queue after
110//!             // the (5 second) lease expires.
111//!             Err(err) if err.should_retry() => (),
112//!             // Errors that shouldn't cause a retry should mark the job as complete so it isn't
113//!             // tried again.
114//!             Err(err) => {
115//!                 log_error(err);
116//!                 work_queue.complete(db, &job).await?;
117//!             }
118//!         }
119//!     }
120//! }
121//! ```
122
123use std::future::Future;
124use std::time::Duration;
125
126use redis::{AsyncCommands, RedisResult};
127use serde::{Deserialize, Serialize};
128use uuid::Uuid;
129
130/// A string which should be prefixed to an identifier to generate a database key.
131///
132/// ### Example
133///
134/// ```rust
135/// use redis_work_queue::KeyPrefix;
136///
137/// let cv_key = KeyPrefix::from("cv:");
138/// // ...
139/// let cv_id = "abcdef-123456";
140/// assert_eq!(cv_key.of(cv_id), "cv:abcdef-123456");
141/// // let cv_info = db.get(cv_key.of(cv_id));
142/// ```
143#[derive(Clone, PartialEq, Eq, Debug)]
144pub struct KeyPrefix {
145    prefix: String,
146}
147
148impl KeyPrefix {
149    pub fn new(prefix: String) -> KeyPrefix {
150        KeyPrefix { prefix }
151    }
152
153    /// Returns the result of prefixing `self` onto `name`.
154    pub fn of(&self, name: &str) -> String {
155        let mut key = String::with_capacity(self.prefix.len() + name.len());
156        key.push_str(&self.prefix);
157        key.push_str(name);
158        key
159    }
160
161    /// Returns the result of prefixing `self` onto `other` as a new `KeyPrefix`.
162    ///
163    /// This is like [`KeyPrefix::concat`] except it only borrows `self`.
164    pub fn and(&self, other: &str) -> KeyPrefix {
165        KeyPrefix::new(self.of(other))
166    }
167
168    /// Returns the result of prefixing `self` onto `other` as a new `KeyPrefix`.
169    ///
170    /// This is like [`KeyPrefix::and`] except it moves `self`.
171    pub fn concat(mut self, other: &str) -> KeyPrefix {
172        self.prefix.push_str(other);
173        self
174    }
175}
176
177impl From<String> for KeyPrefix {
178    fn from(prefix: String) -> KeyPrefix {
179        KeyPrefix::new(prefix)
180    }
181}
182
183impl From<&str> for KeyPrefix {
184    fn from(prefix: &str) -> KeyPrefix {
185        KeyPrefix::new(prefix.to_string())
186    }
187}
188
189impl From<KeyPrefix> for String {
190    fn from(key_prefix: KeyPrefix) -> String {
191        key_prefix.prefix
192    }
193}
194
195impl AsRef<str> for KeyPrefix {
196    fn as_ref(&self) -> &str {
197        &self.prefix
198    }
199}
200
201/// An item for a work queue. Each item has an ID and associated data.
202#[derive(Clone, Debug)]
203pub struct Item {
204    pub id: String,
205    pub data: Box<[u8]>,
206}
207
208impl Item {
209    /// Create a new item, with the provided `data` and a random id (a uuid).
210    pub fn new(data: Box<[u8]>) -> Item {
211        Item {
212            data,
213            id: Uuid::new_v4().to_string(),
214        }
215    }
216
217    /// Create a new item, with the provided `data` and a random id (a uuid).
218    ///
219    /// The item's data is the output of `data.into_bytes().into_boxed_slice()`.
220    pub fn from_string_data(data: String) -> Item {
221        Item::new(data.into_bytes().into_boxed_slice())
222    }
223
224    /// Create a new item with a random id (a uuid). The data is the result of
225    /// `serde_json::to_vec(data)`.
226    pub fn from_json_data<T: Serialize>(data: &T) -> serde_json::Result<Item> {
227        Ok(Item::new(serde_json::to_vec(data)?.into()))
228    }
229
230    /// Returns the data, parsed as JSON.
231    pub fn data_json<'a, T: Deserialize<'a>>(&'a self) -> serde_json::Result<T> {
232        serde_json::from_slice(&self.data)
233    }
234
235    /// Returns the data, parsed as JSON, with a static lifetime.
236    pub fn data_json_static<T: for<'de> Deserialize<'de>>(&self) -> serde_json::Result<T> {
237        serde_json::from_slice(&self.data)
238    }
239}
240
241/// A work queue backed by a redis database
242pub struct WorkQueue {
243    /// A unique ID for this instance
244    session: String,
245    /// The key for the list of items in the queue
246    main_queue_key: String,
247    /// The key for the list of items being processed
248    processing_key: String,
249    // TODO: Implement cleaning in the Rust library?
250    //cleaning_key: String,
251    /// The key prefix for lease entries
252    lease_key: KeyPrefix,
253    /// The key for item data entries
254    item_data_key: KeyPrefix,
255}
256
257impl WorkQueue {
258    pub fn new(name: KeyPrefix) -> WorkQueue {
259        WorkQueue {
260            session: Uuid::new_v4().to_string(),
261            main_queue_key: name.of(":queue"),
262            processing_key: name.of(":processing"),
263            //cleaning_key: name.of(":cleaning"),
264            lease_key: name.and(":lease:"),
265            item_data_key: name.and(":item:"),
266        }
267    }
268
269    /// Add an item to the work queue.
270    ///
271    /// If an item with the same ID already exists, this item is not added, and `false` is returned. Otherwise, if the item is added `true` is returned.
272    ///
273    /// If you know the item ID is unique, and not already in the queue, use the optimised
274    /// [`WorkQueue::add_unique_item`] instead.
275    pub async fn add_item<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
276        let added = db
277            .set_nx(self.item_data_key.of(&item.id), item.data.as_ref())
278            .await?;
279        if added {
280            db.lpush(&self.main_queue_key, &item.id).await?;
281        }
282        Ok(added)
283    }
284
285    /// Add an item, which is known to have an ID not already in the queue, to the work queue. This
286    /// adds the redis commands onto the pipeline passed.
287    ///
288    /// Use [`WorkQueue::add_unique_item`] if you don't want to pass a pipeline directly.
289    pub fn add_unique_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) {
290        // Add the item data
291        // NOTE: it's important that the data is added first, otherwise someone could pop the item
292        // before the data is ready
293        pipeline.set(self.item_data_key.of(&item.id), item.data.as_ref());
294        // Then add the id to the work queue
295        pipeline.lpush(&self.main_queue_key, &item.id);
296    }
297
298    /// Add an item, which is known to have an ID not already in the queue, to the work queue.
299    ///
300    /// This creates a pipeline and executes it on the database.
301    pub async fn add_unique_item<C: AsyncCommands>(
302        &self,
303        db: &mut C,
304        item: &Item,
305    ) -> RedisResult<()> {
306        let mut pipeline = Box::new(redis::pipe());
307        self.add_unique_item_to_pipeline(&mut pipeline, item);
308        pipeline.query_async(db).await
309    }
310
311    /// Return the length of the work queue (not including items being processed, see
312    /// [`WorkQueue::processing`]).
313    pub fn queue_len<'a, C: AsyncCommands>(
314        &'a self,
315        db: &'a mut C,
316    ) -> impl Future<Output = RedisResult<usize>> + 'a {
317        db.llen(&self.main_queue_key)
318    }
319
320    /// Return the number of items being processed.
321    pub fn processing<'a, C: AsyncCommands>(
322        &'a self,
323        db: &'a mut C,
324    ) -> impl Future<Output = RedisResult<usize>> + 'a {
325        db.llen(&self.processing_key)
326    }
327
328    /// Request a work lease the work queue. This should be called by a worker to get work to
329    /// complete. When completed, the `complete` method should be called.
330    ///
331    /// The function will return either when a job is leased or after `timeout` if `timeout`
332    /// isn't `None`.
333    ///
334    /// If the job is not completed (by calling [`WorkQueue::complete`]) before the end of
335    /// `lease_duration`, another worker may pick up the same job. It is not a problem if a job is
336    /// marked as `done` more than once.
337    ///
338    /// If you've not already done it, it's worth reading [the documentation on leasing
339    /// items](https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item).
340    pub async fn lease<C: AsyncCommands>(
341        &self,
342        db: &mut C,
343        timeout: Option<Duration>,
344        lease_duration: Duration,
345    ) -> RedisResult<Option<Item>> {
346        loop {
347            // First, to get an item, we try to move an item from the main queue to the processing list.
348            let Some(item_id): Option<String> = (match timeout {
349                Some(Duration::ZERO) => {
350                    db.rpoplpush(&self.main_queue_key, &self.processing_key)
351                        .await?
352                }
353                _ => {
354                    db.brpoplpush(
355                        &self.main_queue_key,
356                        &self.processing_key,
357                        timeout.map(|d| d.as_secs() as f64).unwrap_or(0.),
358                    )
359                    .await?
360                }
361            }) else {
362                return Ok(None);
363            };
364
365            // If we got an item, fetch the associated data.
366            let item_data: Vec<u8> = match db.get(self.item_data_key.of(&item_id)).await? {
367                Some(item_data) => item_data,
368                // If the item doesn't actually exist, and there's no timeout, just try again.
369                None if timeout == None => continue,
370                // If there was a timeout, we return early.
371                None => return Ok(None),
372            };
373
374            // Now setup the lease item.
375            // NOTE: Racing for a lease is ok
376            db.set_ex(
377                self.lease_key.of(&item_id),
378                &self.session,
379                lease_duration.as_secs(),
380            )
381            .await?;
382
383            return Ok(Some(Item {
384                data: item_data.into_boxed_slice(),
385                id: item_id,
386            }));
387        }
388    }
389
390    /// Marks a job as completed and remove it from the work queue. After `complete` has been called
391    /// (and returns `true`), no workers will receive this job again.
392    ///
393    /// `complete` returns a boolean indicating if *the job has been removed* **and** *this worker
394    /// was the first worker to call `complete`*. So, while lease might give the same job to
395    /// multiple workers, complete will return `true` for only one worker.
396    pub async fn complete<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
397        // If we did actually remove it, delete the item data and lease.
398        // If we didn't really remove it, it's probably been returned to the work queue so the
399        // data is still needed and the lease might not be ours (if it is still ours, it'll
400        // expire anyway).
401        let (items_deleted, (), ()): (usize, (), ()) = redis::pipe()
402            .del(self.item_data_key.of(&item.id))
403            .lrem(&self.processing_key, 0, &item.id)
404            .del(self.lease_key.of(&item.id))
405            .query_async(db)
406            .await?;
407        Ok(items_deleted > 0)
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::{Item, KeyPrefix};
414
415    use serde::{Deserialize, Serialize};
416
417    #[test]
418    fn test_key_prefix() {
419        let prefix = KeyPrefix::new("abc".to_string());
420        let another_prefix = prefix.and("123");
421        let final_prefix = KeyPrefix::new("abc123".to_string());
422        assert_eq!(another_prefix, final_prefix);
423        assert_ne!(prefix, another_prefix);
424        assert_eq!(another_prefix.as_ref(), final_prefix.as_ref());
425        assert_eq!(prefix.as_ref(), "abc");
426        assert_eq!(prefix.of("bar"), "abcbar");
427        assert_eq!(
428            Into::<String>::into(prefix.and("foo")),
429            "abcfoo".to_string()
430        );
431        assert_eq!(prefix.of("foo"), "abcfoo".to_string());
432        assert_eq!(prefix.and("foo").of("bar"), "abcfoobar".to_string());
433    }
434
435    #[test]
436    fn test_item_json() {
437        #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
438        struct Test {
439            #[serde(default)]
440            n: usize,
441            s: String,
442        }
443
444        let test_foo = Test {
445            n: 7,
446            s: "foo".to_string(),
447        };
448        let test_bar = Test {
449            n: 8,
450            s: "bar".to_string(),
451        };
452        let test_baz = Test {
453            n: 0,
454            s: "baz".to_string(),
455        };
456
457        assert_eq!(
458            test_foo,
459            Item::from_json_data(&test_foo)
460                .unwrap()
461                .data_json()
462                .unwrap()
463        );
464
465        let test_item_bar = Item::from_json_data(&test_bar).unwrap();
466        assert_eq!(
467            test_item_bar.id.len(),
468            "00112233-4455-6677-8899-aabbccddeeff".len()
469        );
470        let test_item_baz = Item::new(
471            "{\"s\":\"baz\"}"
472                .to_string()
473                .into_bytes()
474                .into_boxed_slice(),
475        );
476        assert_eq!(
477            test_item_baz.id.len(),
478            "00112233-4455-6677-8899-aabbccddeeff".len()
479        );
480        assert_ne!(test_item_bar.id, test_item_baz.id);
481        assert_ne!(test_item_bar.data, test_item_baz.data);
482        assert_ne!(
483            test_item_bar.data_json::<Test>().unwrap(),
484            test_item_baz.data_json().unwrap()
485        );
486        assert_eq!(
487            test_item_bar.data_json::<Test>().unwrap(),
488            test_item_bar.data_json().unwrap()
489        );
490        assert_eq!(test_item_bar.data_json::<Test>().unwrap(), test_bar);
491        assert_eq!(test_item_baz.data_json::<Test>().unwrap(), test_baz);
492    }
493}