redis_objects/
queue.rs

1//! Objects for using lists and sorted sets in redis as queues.
2
3use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::AsyncCommands;
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use tracing::instrument;
12
13use crate::ErrorTypes;
14
15use super::{RedisObjects, retry_call};
16
17/// A FIFO queue
18/// Optionally has a server side time to live
19pub struct Queue<T: Serialize + DeserializeOwned> {
20    raw: RawQueue,
21    _data: PhantomData<T>
22}
23
24impl<T: Serialize + DeserializeOwned> std::fmt::Debug for Queue<T> {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("Queue").field("name", &self.raw.name).finish()
27    }
28}
29
30impl<T: Serialize + DeserializeOwned> Clone for Queue<T> {
31    fn clone(&self) -> Self {
32        Self { raw: self.raw.clone(), _data: self._data }
33    }
34}
35
36impl<T: Serialize + DeserializeOwned> Queue<T> {
37    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
38        Self {
39            raw: RawQueue::new(name, store, ttl),
40            _data: PhantomData,
41        }
42    }
43
44    /// Get a reference to the server/object collection holding this queue
45    pub fn host(&self) -> Arc<RedisObjects> {
46        self.raw.store.clone()
47    }
48
49    /// enqueue a single item
50    pub async fn push(&self, data: &T) -> Result<(), ErrorTypes> {
51        self.raw.push(&serde_json::to_vec(data)?).await
52    }
53
54    /// enqueue a sequence of items
55    pub async fn push_batch(&self, data: &[T]) -> Result<(), ErrorTypes> {
56        let data: Result<Vec<Vec<u8>>, _> = data.iter()
57            .map(|item | serde_json::to_vec(item))
58            .collect();
59        self.raw.push_batch(data?.iter().map(|item| item.as_slice())).await
60    }
61
62    /// Put all messages passed back at the head of the FIFO queue.
63    pub async fn unpop(&self, data: &T) -> Result<(), ErrorTypes> {
64        self.raw.unpop(&serde_json::to_vec(data)?).await
65    }
66
67    /// Read the number of items in the queue
68    pub async fn length(&self) -> Result<usize, ErrorTypes> {
69        self.raw.length().await
70    }
71
72    /// load the item that would be returned by the next call to pop
73    pub async fn peek_next(&self) -> Result<Option<T>, ErrorTypes> {
74        Ok(match self.raw.peek_next().await? { 
75            Some(value) => Some(serde_json::from_slice(&value)?),
76            None => None
77        })
78    }
79
80    /// Load the entire content of the queue into memory
81    pub async fn content(&self) -> Result<Vec<T>, ErrorTypes> {
82        let response: Vec<Vec<u8>> = self.raw.content().await?;
83        let mut out = vec![];
84        for data in response {
85            out.push(serde_json::from_slice(&data)?);
86        }
87        Ok(out)
88    }
89
90    /// Clear all data for this object
91    pub async fn delete(&self) -> Result<(), ErrorTypes> {
92        self.raw.delete().await
93    }
94
95    /// dequeue an item from the front of the queue, returning immediately if empty
96    pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
97        Ok(match self.raw.pop().await? { 
98            Some(value) => Some(serde_json::from_slice(&value)?),
99            None => None
100        })
101    }
102
103    /// Make a blocking pop call with a timeout
104    pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<T>, ErrorTypes> {
105        Ok(match self.raw.pop_timeout(timeout).await? { 
106            Some(value) => Some(serde_json::from_slice(&value)?),
107            None => None
108        })
109    }
110
111    /// Pop as many items as possible up to a certain limit
112    pub async fn pop_batch(&self, limit: usize) -> Result<Vec<T>, ErrorTypes> {
113        let response: Vec<Vec<u8>> = self.raw.pop_batch(limit).await?;
114        let mut out = vec![];
115        for data in response {
116            out.push(serde_json::from_slice(&data)?);
117        }
118        Ok(out)
119    }
120
121    /// Wait for up to the given timeout for any of the given queues to recieve a value
122    pub async fn select(queues: &[&Queue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
123        let queues: Vec<&RawQueue> = queues.iter().map(|queue|&queue.raw).collect();
124        let response = RawQueue::select(&queues, timeout).await?;
125        Ok(match response {
126            Some((name, data)) => Some((name, serde_json::from_slice(&data)?)),
127            None => None,
128        })
129    }
130
131    /// access the untyped raw queue underlying the typed Queue object
132    pub fn raw(&self) -> RawQueue {
133        self.raw.clone()
134    }
135}
136
137/// A FIFO queue
138/// Optionally has a server side time to live
139#[derive(Clone)]
140pub struct RawQueue {
141    name: String,
142    store: Arc<RedisObjects>,
143    ttl: Option<Duration>,
144    last_expire_time: Arc<std::sync::Mutex<Option<std::time::Instant>>>,
145}
146
147impl std::fmt::Debug for RawQueue {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct("RawQueue").field("name", &self.name).field("store", &self.store).finish()
150    }
151}
152
153impl RawQueue {
154    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
155        Self {
156            name,
157            store,
158            ttl,
159            last_expire_time: Arc::new(std::sync::Mutex::new(None)),
160        }
161    }
162
163    /// set the expiry on the queue if it has not been recently set
164    async fn conditional_expire(&self) -> Result<(), ErrorTypes> {
165        // load the ttl of this object has one set
166        if let Some(ttl) = self.ttl {
167            {
168                // the last expire time is behind a mutex so that the queue object is threadsafe
169                let mut last_expire_time = self.last_expire_time.lock().unwrap();
170
171                // figure out if its time to update the expiry, wait until we are 25% through the
172                // ttl to avoid resetting something only milliseconds old
173                if let Some(time) = *last_expire_time {
174                    if time.elapsed() < (ttl / 4) {
175                        return Ok(())
176                    }
177                };
178
179                // update the time in the mutex then drop it so we aren't holding the lock 
180                // while we make the call to the redis server
181                *last_expire_time = Some(std::time::Instant::now());
182            }
183            let _: () = retry_call!(self.store.pool, expire, &self.name, ttl.as_secs() as i64)?;
184        }
185        Ok(())
186    }
187
188    /// enqueue a single item
189    #[instrument(skip(data))]
190    pub async fn push(&self, data: &[u8]) -> Result<(), ErrorTypes> {
191        let _: () = retry_call!(self.store.pool, rpush, &self.name, data)?;
192        self.conditional_expire().await
193    }
194
195    /// enqueue a sequence of items
196    #[instrument(skip(data))]
197    pub async fn push_batch(&self, data: impl Iterator<Item=&[u8]>) -> Result<(), ErrorTypes> {
198        let mut pipe = redis::pipe();
199        for item in data {
200            pipe.rpush(&self.name, item);
201        }
202        let _: () = retry_call!(method, self.store.pool, pipe, query_async)?;
203        self.conditional_expire().await
204    }
205
206    /// Put all messages passed back at the head of the FIFO queue.
207    #[instrument(skip(data))]
208    pub async fn unpop(&self, data: &[u8]) -> Result<(), ErrorTypes> {
209        let _: () = retry_call!(self.store.pool, lpush, &self.name, data)?;
210        self.conditional_expire().await
211    }
212
213    /// Read the number of items in the queue
214    #[instrument]
215    pub async fn length(&self) -> Result<usize, ErrorTypes> {
216        retry_call!(self.store.pool, llen, &self.name)
217    }
218
219    /// load the item that would be returned by the next call to pop
220    #[instrument]
221    pub async fn peek_next(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
222        let response: Vec<Vec<u8>> = retry_call!(self.store.pool, lrange, &self.name, 0, 0)?;
223        Ok(response.into_iter().nth(0))
224    }
225
226    /// Load the entire content of the queue into memory
227    #[instrument]
228    pub async fn content(&self) -> Result<Vec<Vec<u8>>, ErrorTypes> {
229        Ok(retry_call!(self.store.pool, lrange, &self.name, 0, -1)?)
230    }
231
232    /// Clear all data for this object
233    #[instrument]
234    pub async fn delete(&self) -> Result<(), ErrorTypes> {
235        retry_call!(self.store.pool, del, &self.name)
236    }
237
238    /// dequeue an item from the front of the queue, returning immediately if empty
239    #[instrument]
240    pub async fn pop(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
241        Ok(retry_call!(self.store.pool, lpop, &self.name, None)?)
242    }
243
244    /// Make a blocking pop call with a timeout
245    #[instrument]
246    pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<Vec<u8>>, ErrorTypes> {
247        let response: Option<(String, Vec<u8>)> = retry_call!(self.store.pool, blpop, &self.name, timeout.as_secs_f64())?;
248        Ok(response.map(|(_, data)| data))
249    }
250
251    /// Pop as many items as possible up to a certain limit
252    #[instrument]
253    pub async fn pop_batch(&self, limit: usize) -> Result<Vec<Vec<u8>>, ErrorTypes> {
254        let limit = match NonZeroUsize::new(limit) {
255            Some(value) => value,
256            None => return Ok(Default::default()),
257        };
258        Ok(retry_call!(self.store.pool, lpop, &self.name, Some(limit))?)
259    }
260
261    /// Wait for up to the given timeout for any of the given queues to recieve a value
262    #[instrument]
263    pub async fn select(queues: &[&RawQueue], timeout: Option<Duration>) -> Result<Option<(String, Vec<u8>)>, ErrorTypes> {
264        let timeout = timeout.unwrap_or_default().as_secs_f64();
265        if queues.is_empty() {
266            return Ok(None)
267        }
268
269        let store = &queues[0].store;        
270        let mut names = vec![];
271        for queue in queues {
272            names.push(queue.name.as_str())
273        }
274        Ok(retry_call!(store.pool, blpop, &names, timeout)?)
275    }
276}
277
278/// Work around for inconsistency between ZRANGEBYSCORE and ZREMRANGEBYSCORE
279///   (No limit option available or we would just be using that directly)
280///
281/// args:
282///   minimum score to pop
283///   maximum score to pop
284///   number of elements to skip before popping any
285///   max element count to pop
286const PQ_DEQUEUE_RANGE_SCRIPT: &str = r#"
287local unpack = table.unpack or unpack
288local min_score = tonumber(ARGV[1]);
289if min_score == nil then min_score = -math.huge end
290local max_score = tonumber(ARGV[2]);
291if max_score == nil then max_score = math.huge end
292local rem_offset = tonumber(ARGV[3]);
293local rem_limit = tonumber(ARGV[4]);
294
295local entries = redis.call("zrangebyscore", KEYS[1], min_score, max_score, "limit", rem_offset, rem_limit);
296if #entries > 0 then redis.call("zrem", KEYS[1], unpack(entries)) end
297return entries
298"#;
299
300/// The length of prefixes added to the entries in the priority queue
301const SORTING_KEY_LEN: usize = 21;
302
303/// A priority queue implemented on a redis sorted set
304pub struct PriorityQueue<T> {
305    name: String,
306    store: Arc<RedisObjects>,
307    dequeue_range: redis::Script,
308    _data: PhantomData<T>,
309}
310
311impl<T> std::fmt::Debug for PriorityQueue<T> {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        f.debug_struct("PriorityQueue").field("name", &self.name).field("store", &self.store).finish()
314    }
315}
316
317impl<T: Serialize + DeserializeOwned> PriorityQueue<T> {
318    pub (crate) fn new(name: String, store: Arc<RedisObjects>) -> Self {
319        Self {
320            name,
321            store,
322            dequeue_range: redis::Script::new(PQ_DEQUEUE_RANGE_SCRIPT),
323            _data: PhantomData,
324        }
325    }
326
327    /// get key name used for this queue
328    pub fn name(&self) -> &str {
329        self.name.as_str()
330    }
331
332    fn encode(item: &T) -> Result<Vec<u8>, ErrorTypes> {
333        let vip = false;
334        let vip = if vip { 0 } else { 9 };
335        
336        let now = chrono::Utc::now().timestamp_micros();
337        let data = serde_json::to_string(&item)?;
338
339        // let value = f"{vip}{f'{int(time.time()*1000000):020}'}{json.dumps(data)}"
340        Ok(format!("{vip}{now:020}{data}").into_bytes())
341    }
342    fn decode(data: &[u8]) -> Result<T, ErrorTypes> {
343        Ok(serde_json::from_slice(&data[SORTING_KEY_LEN..])?)
344    }
345
346    /// Return the number of items within the two priority values (inclusive on both ends)
347    #[instrument]
348    pub async fn count(&self, lowest: f64, highest: f64) -> Result<u64, ErrorTypes> {
349        Ok(retry_call!(self.store.pool, zcount, &self.name, -highest, -lowest)?)
350    }
351
352    /// Remove all the data from this queue
353    #[instrument]
354    pub async fn delete(&self) -> Result<(), ErrorTypes> {
355        retry_call!(self.store.pool, del, &self.name)
356    }
357
358    /// Get the number of items in the queue
359    #[instrument]
360    pub async fn length(&self) -> Result<u64, ErrorTypes> {
361        retry_call!(self.store.pool, zcard, &self.name)
362    }
363
364    /// Remove items from the front of the queue
365    #[instrument]
366    pub async fn pop(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
367        if num <= 0 {
368            return Ok(Default::default())
369        };
370        let items: Vec<(Vec<u8>, f64)> = retry_call!(self.store.pool, zpopmin, &self.name, num)?;
371        let mut out = vec![];
372        for (data, _priority) in items {
373            out.push(Self::decode(&data)?);
374        }
375        Ok(out)
376    }
377
378    /// When only one item is requested, blocking is is possible.
379    #[instrument]
380    pub async fn blocking_pop(&self, timeout: Duration, low_priority: bool) -> Result<Option<T>, ErrorTypes> {
381        let result: Option<(String, Vec<u8>, f64)> = if low_priority {
382            retry_call!(self.store.pool, bzpopmax, &self.name, timeout.as_secs_f64())?
383        } else {
384            retry_call!(self.store.pool, bzpopmin, &self.name, timeout.as_secs_f64())?
385        };
386        match result {
387            Some(result) => Ok(Some(Self::decode(&result.1)?)),
388            None => Ok(None)
389        }
390    }
391
392//     def blocking_pop(self, timeout=0, low_priority=False):
393//         """When only one item is requested, blocking is is possible."""
394//         if low_priority:
395//             result = retry_call(self.c.bzpopmax, self.name, timeout)
396//         else:
397//             result = retry_call(self.c.bzpopmin, self.name, timeout)
398//         if result:
399//             return decode(result[1][SORTING_KEY_LEN:])
400//         return None
401
402    /// Dequeue a number of elements, within a specified range of scores.
403    /// Limits given are inclusive, can be made exclusive, see redis docs on how to format limits for that.
404    /// NOTE: lower/upper limit is negated+swapped in the lua script, no need to do it here
405    /// :param lower_limit: The score of all dequeued elements must be higher or equal to this.
406    /// :param upper_limit: The score of all dequeued elements must be lower or equal to this.
407    /// :param skip: In the range of available items to dequeue skip over this many.
408    /// :param num: Maximum number of elements to dequeue.
409    #[instrument]
410    pub async fn dequeue_range(&self, lower_limit: Option<i64>, upper_limit: Option<i64>, skip: Option<u32>, num: Option<u32>) -> Result<Vec<T>, ErrorTypes> {
411        let skip = skip.unwrap_or(0);
412        let num = num.unwrap_or(1);
413        let mut call = self.dequeue_range.key(&self.name);
414
415        let inner_lower = match upper_limit {
416            Some(value) => -value,
417            None => i64::MIN,
418        };
419        let inner_upper = match lower_limit {
420            Some(value) => -value,
421            None => i64::MAX,
422        };
423
424        let call = call.arg(inner_lower).arg(inner_upper).arg(skip).arg(num);
425        let results: Vec<Vec<u8>> = retry_call!(method, self.store.pool, call, invoke_async)?;
426        results.iter()
427            .map(|row| Self::decode(row))
428            .collect()
429        // results = retry_call(self._deque_range, keys=[self.name], args=[lower_limit, upper_limit, skip, num])
430        // return [decode(res[SORTING_KEY_LEN:]) for res in results]
431    }
432
433    /// Place an item into the queue
434    #[instrument(skip(data))]
435    pub async fn push(&self, priority: f64, data: &T) -> Result<Vec<u8>, ErrorTypes> {
436        let value = Self::encode(data)?;
437        if retry_call!(self.store.pool, zadd, &self.name, &value, -priority)? {
438            Ok(value)
439        } else {
440            Err(ErrorTypes::UnknownRedisError)
441        }
442    }
443
444    /// Given the raw encoding of an item in queue get its position
445    #[instrument(skip(raw_value))]
446    pub async fn rank(&self, raw_value: &[u8]) -> Result<Option<u64>, ErrorTypes> {
447        retry_call!(self.store.pool, zrank, &self.name, raw_value)
448    }
449
450    /// Remove a specific item from the queue based on its raw value
451    #[instrument(skip(raw_value))]
452    pub async fn remove(&self, raw_value: &[u8]) -> Result<bool, ErrorTypes> {
453        let count: i32 = retry_call!(self.store.pool, zrem, &self.name, raw_value)?;
454        Ok(count >= 1)
455    }
456
457    /// Pop items from the low priority end of the queue
458    #[instrument]
459    pub async fn unpush(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
460        if num <= 0 {
461            return Ok(Default::default())
462        };
463        let items: Vec<(Vec<u8>, i32)> = retry_call!(self.store.pool, zpopmax, &self.name, num)?;
464        let mut out = vec![];
465        for (data, _priority) in items {
466            out.push(Self::decode(&data)?);
467        }
468        Ok(out)
469    }
470
471    /// Pop the first item from any of the given queues within the given timeout
472    #[instrument]
473    pub async fn select(queues: &[&PriorityQueue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
474        if queues.is_empty() {
475            return Ok(Default::default())
476        }
477
478        let _timeout = timeout.unwrap_or_default().as_secs_f64();
479        // todo!("Waiting for deadpool-redis package to upgrade to redis-rs 0.24");
480        let mut names = vec![];
481        for queue in queues {
482            names.push(queue.name.as_str());
483        }
484        let response: Option<(String, Vec<u8>, f64)> = retry_call!(queues[0].store.pool, bzpopmin, &names, _timeout)?;
485
486        Ok(match response {
487            Some((queue, value, _)) => Some((queue, Self::decode(&value)?)),
488            None => None,
489        })
490    }
491
492    /// Utility function for batch reading queue lengths.
493    #[instrument]
494    pub async fn all_length(queues: &[&PriorityQueue<T>]) -> Result<Vec<u64>, ErrorTypes> {
495        if queues.is_empty() {
496            return Ok(Default::default())
497        }
498
499        let mut pipe = redis::pipe();
500        for que in queues {
501            pipe.zcard(&que.name);
502        }
503
504        Ok(retry_call!(method, queues[0].store.pool, pipe, query_async)?)
505    }
506
507    
508}
509
510/// Object represeting a colleciton of simple queues with the same prefix and message type 
511pub struct MultiQueue<Message: Serialize + DeserializeOwned> {
512    store: Arc<RedisObjects>,
513    prefix: String,
514    _data: PhantomData<Message>,
515}
516
517impl<Message: Serialize + DeserializeOwned> std::fmt::Debug for MultiQueue<Message> {
518    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519        f.debug_struct("MultiQueue").field("store", &self.store).field("prefix", &self.prefix).finish()
520    }
521}
522
523impl<Message: Serialize + DeserializeOwned> MultiQueue<Message> {
524    pub(crate) fn new(prefix: String, store: Arc<RedisObjects>) -> Self {
525        Self {store, prefix, _data: Default::default()}
526    }
527
528    /// Delete one of the queues
529    #[instrument]
530    pub async fn delete(&self, name: &str) -> Result<(), ErrorTypes> {
531        retry_call!(self.store.pool, del, self.prefix.clone() + name)
532    }
533
534    /// Get the length of one of the queues
535    #[instrument]
536    pub async fn length(&self, name: &str) -> Result<u64, ErrorTypes> {
537        retry_call!(self.store.pool, llen, self.prefix.clone() + name)
538    }
539
540    /// Pop from one of the queues, returning asap if no values are available.
541    #[instrument]
542    pub async fn pop_nonblocking(&self, name: &str) -> Result<Option<Message>, ErrorTypes> {
543        let result: Option<String> = retry_call!(self.store.pool, lpop, self.prefix.clone() + name, None)?;
544        match result {
545            Some(result) => Ok(serde_json::from_str(&result)?),
546            None => Ok(None)
547        }
548    }
549
550    /// Pop from one of the queues, wait up to `timeout` if no values are available.
551    #[instrument]
552    pub async fn pop(&self, name: &str, timeout: Duration) -> Result<Option<Message>, ErrorTypes> {
553        let result: Option<(String, String)> = retry_call!(self.store.pool, blpop, self.prefix.clone() + name, timeout.as_secs_f64())?;
554        match result {
555            Some((_, result)) => Ok(serde_json::from_str(&result)?),
556            None => Ok(None),
557        }
558    }
559
560    /// Insert an item into one of the queues
561    #[instrument(skip(message))]
562    pub async fn push(&self, name: &str, message: &Message) -> Result<(), ErrorTypes> {
563        let _: () = retry_call!(self.store.pool, rpush, self.prefix.clone() + name, serde_json::to_string(message)?)?;
564        Ok(())
565    }
566}