1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
use std::future::Future;
use std::time::Duration;

use redis::{AsyncCommands, RedisResult};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

/// A string which should be prefixed to an identifier to generate a database key.
///
/// ### Example
///
/// ```rust,ignore
/// let cv_key = KeyPrefix::new("cv:");
/// // ...
/// let cv_id = "abcdef-123456";
/// let cv_info = db.get(cv_key.of(cv_id));
/// ```
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct KeyPrefix {
    prefix: String,
}

impl KeyPrefix {
    pub fn new(prefix: String) -> KeyPrefix {
        KeyPrefix { prefix }
    }

    /// Returns the result of prefixing `self` onto `name`.
    pub fn of(&self, name: &str) -> String {
        let mut key = String::with_capacity(self.prefix.len() + name.len());
        key.push_str(&self.prefix);
        key.push_str(name);
        key
    }

    /// Returns the result of prefixing `self` onto `other` as a new `KeyPrefix`.
    ///
    /// This is like [`KeyPrefix::concat`] except it only borrows `self`.
    pub fn and(&self, other: &str) -> KeyPrefix {
        KeyPrefix::new(self.of(other))
    }

    /// Returns the result of prefixing `self` onto `other` as a new `KeyPrefix`.
    ///
    /// This is like [`KeyPrefix::and`] except it moves `self`.
    pub fn concat(mut self, other: &str) -> KeyPrefix {
        self.prefix.push_str(other);
        self
    }
}

impl Into<String> for KeyPrefix {
    fn into(self) -> String {
        self.prefix
    }
}

impl AsRef<str> for KeyPrefix {
    fn as_ref(&self) -> &str {
        &self.prefix
    }
}

/// An item for a work queue. Each item has an ID and associated data.
#[derive(Clone, Debug)]
pub struct Item {
    pub id: String,
    pub data: Box<[u8]>,
}

impl Item {
    /// Create a new item with a random id (a uuid).
    pub fn new(data: Box<[u8]>) -> Item {
        Item {
            data,
            id: Uuid::new_v4().to_string(),
        }
    }

    /// Create a new item with a random id (a uuid). The data is the result of
    /// `serde_json::to_vec(data)`.
    pub fn from_json_data<T: Serialize>(data: &T) -> serde_json::Result<Item> {
        Ok(Item::new(serde_json::to_vec(data)?.into()))
    }

    /// Returns the data, parsed as JSON.
    pub fn data_json<'a, T: Deserialize<'a>>(&'a self) -> serde_json::Result<T> {
        serde_json::from_slice(&self.data)
    }

    /// Returns the data, parsed as JSON, with a static lifetime.
    pub fn data_json_static<T: for<'de> Deserialize<'de>>(&self) -> serde_json::Result<T> {
        serde_json::from_slice(&self.data)
    }
}

/// A work queue backed by a redis database
pub struct WorkQueue {
    /// A unique ID for this instance
    session: String,
    /// The key for the list of items in the queue
    main_queue_key: String,
    /// The key for the list of items being processed
    processing_key: String,
    // TODO: Implement cleaning in the Rust library?
    //cleaning_key: String,
    /// The key prefix for lease entries
    lease_key: KeyPrefix,
    /// The key for item data entries
    item_data_key: KeyPrefix,
}

impl WorkQueue {
    pub fn new(name: KeyPrefix) -> WorkQueue {
        WorkQueue {
            session: Uuid::new_v4().to_string(),
            main_queue_key: name.of(":queue"),
            processing_key: name.of(":processing"),
            //cleaning_key: name.of(":cleaning"),
            lease_key: name.and(":leased_by_session:"),
            item_data_key: name.and(":item:"),
        }
    }

    /// Add an item to the work queue. This adds the redis commands onto the pipeline passed.
    ///
    /// Use [`WorkQueue::add_item`] if you don't want to pass a pipeline directly.
    pub fn add_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) {
        // Add the item data
        // NOTE: it's important that the data is added first, otherwise someone could pop the item
        // before the data is ready
        pipeline.set(self.item_data_key.of(&item.id), item.data.as_ref());
        // Then add the id to the work queue
        pipeline.lpush(&self.main_queue_key, &item.id);
    }

    /// Add an item to the work queue.
    ///
    /// This creates a pipeline and executes it on the database.
    pub async fn add_item<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<()> {
        let mut pipeline = Box::new(redis::pipe());
        self.add_item_to_pipeline(&mut pipeline, item);
        pipeline.query_async(db).await
    }

    /// Return the length of the work queue (not including items being processed, see
    /// [`WorkQueue::processing`]).
    pub fn queue_len<'a, C: AsyncCommands>(
        &'a self,
        db: &'a mut C,
    ) -> impl Future<Output = RedisResult<usize>> + 'a {
        db.llen(&self.main_queue_key)
    }

    /// Return the number of items being processed.
    pub fn processing<'a, C: AsyncCommands>(
        &'a self,
        db: &'a mut C,
    ) -> impl Future<Output = RedisResult<usize>> + 'a {
        db.llen(&self.processing_key)
    }

    /// Request a work lease the work queue. This should be called by a worker to get work to
    /// complete. When completed, the `complete` method should be called.
    ///
    /// The function will return either when a job is leased or after `timeout` if `timeout`
    /// isn't `None`.
    ///
    /// If the job is not completed (by calling [`WorkQueue::complete`]) before the end of
    /// `lease_duration`, another worker may pick up the same job. It is not a problem if a job is
    /// marked as `done` more than once.
    // TODO: Add non-blocking option.
    pub async fn lease<C: AsyncCommands>(
        &self,
        db: &mut C,
        timeout: Option<Duration>,
        lease_duration: Duration,
    ) -> RedisResult<Option<Item>> {
        // First, to get an item, we try to move an item from the main queue to the processing list.
        let item_id: Option<String> = match timeout {
            Some(Duration::ZERO) => {
                db.rpoplpush(&self.main_queue_key, &self.processing_key)
                    .await?
            }
            _ => {
                db.brpoplpush(
                    &self.main_queue_key,
                    &self.processing_key,
                    timeout.map(|d| d.as_secs() as usize).unwrap_or(0),
                )
                .await?
            }
        };

        // If we got an item, fetch the associated data.
        let item = match item_id {
            Some(item_id) => Item {
                data: db
                    .get::<_, Vec<u8>>(self.item_data_key.of(&item_id))
                    .await?
                    .into_boxed_slice(),
                id: item_id,
            },
            None => return Ok(None),
        };

        // Now setup the lease item.
        // NOTE: Racing for a lease is ok
        db.set_ex(
            self.lease_key.of(&item.id),
            &self.session,
            lease_duration.as_secs() as usize,
        )
        .await?;

        Ok(Some(item))
    }

    /// Mark a job as completed and remove it from the work queue.
    pub async fn complete<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
        let removed: usize = db.lrem(&self.processing_key, 0, &item.id).await?;
        if removed == 0 {
            return Ok(false);
        }
        // If we did actually remove it, delete the item data and lease.
        // If we didn't really remove it, it's probably been returned to the work queue so the
        // data is still needed and the lease might not be ours (if it is still ours, it'll
        // expire anyway).
        redis::pipe()
            .del(self.item_data_key.of(&item.id))
            .del(self.lease_key.of(&item.id))
            .query_async(db)
            .await?;
        Ok(true)
    }
}

#[cfg(test)]
mod tests {
    use super::{Item, KeyPrefix};

    use serde::{Deserialize, Serialize};

    #[test]
    fn test_key_prefix() {
        let prefix = KeyPrefix::new("abc".to_string());
        let another_prefix = prefix.and("123");
        let final_prefix = KeyPrefix::new("abc123".to_string());
        assert_eq!(another_prefix, final_prefix);
        assert_ne!(prefix, another_prefix);
        assert_eq!(another_prefix.as_ref(), final_prefix.as_ref());
        assert_eq!(prefix.as_ref(), "abc");
        assert_eq!(prefix.of("bar"), "abcbar");
        assert_eq!(
            Into::<String>::into(prefix.and("foo")),
            "abcfoo".to_string()
        );
        assert_eq!(prefix.of("foo"), "abcfoo".to_string());
        assert_eq!(prefix.and("foo").of("bar"), "abcfoobar".to_string());
    }

    #[test]
    fn test_item_json() {
        #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
        struct Test {
            #[serde(default)]
            n: usize,
            s: String,
        }

        let test_foo = Test {
            n: 7,
            s: "foo".to_string(),
        };
        let test_bar = Test {
            n: 8,
            s: "bar".to_string(),
        };
        let test_baz = Test {
            n: 0,
            s: "baz".to_string(),
        };

        assert_eq!(
            test_foo,
            Item::from_json_data(&test_foo)
                .unwrap()
                .data_json()
                .unwrap()
        );

        let test_item_bar = Item::from_json_data(&test_bar).unwrap();
        assert_eq!(
            test_item_bar.id.len(),
            "00112233-4455-6677-8899-aabbccddeeff".len()
        );
        let test_item_baz = Item::new(
            "{\"s\":\"baz\"}"
                .to_string()
                .into_bytes()
                .into_boxed_slice(),
        );
        assert_eq!(
            test_item_baz.id.len(),
            "00112233-4455-6677-8899-aabbccddeeff".len()
        );
        assert_ne!(test_item_bar.id, test_item_baz.id);
        assert_ne!(test_item_bar.data, test_item_baz.data);
        assert_ne!(
            test_item_bar.data_json::<Test>().unwrap(),
            test_item_baz.data_json().unwrap()
        );
        assert_eq!(
            test_item_bar.data_json::<Test>().unwrap(),
            test_item_bar.data_json().unwrap()
        );
        assert_eq!(test_item_bar.data_json::<Test>().unwrap(), test_bar);
        assert_eq!(test_item_baz.data_json::<Test>().unwrap(), test_baz);
    }
}