redis_objects/
queue.rs

1//! Objects for using lists and sorted sets in redis as queues.
2
3use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::AsyncCommands;
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use tracing::instrument;
12
13use crate::ErrorTypes;
14
15use super::{RedisObjects, retry_call};
16
17/// A FIFO queue
18/// Optionally has a server side time to live
19pub struct Queue<T: Serialize + DeserializeOwned> {
20    raw: RawQueue,
21    _data: PhantomData<T>
22}
23
24impl<T: Serialize + DeserializeOwned> Clone for Queue<T> {
25    fn clone(&self) -> Self {
26        Self { raw: self.raw.clone(), _data: self._data }
27    }
28}
29
30impl<T: Serialize + DeserializeOwned> Queue<T> {
31    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
32        Self {
33            raw: RawQueue::new(name, store, ttl),
34            _data: PhantomData,
35        }
36    }
37
38    /// Get a reference to the server/object collection holding this queue
39    pub fn host(&self) -> Arc<RedisObjects> {
40        self.raw.store.clone()
41    }
42
43    /// enqueue a single item
44    pub async fn push(&self, data: &T) -> Result<(), ErrorTypes> {
45        self.raw.push(&serde_json::to_vec(data)?).await
46    }
47
48    /// enqueue a sequence of items
49    pub async fn push_batch(&self, data: &[T]) -> Result<(), ErrorTypes> {
50        let data: Result<Vec<Vec<u8>>, _> = data.iter()
51            .map(|item | serde_json::to_vec(item))
52            .collect();
53        self.raw.push_batch(data?.iter().map(|item| item.as_slice())).await
54    }
55
56    /// Put all messages passed back at the head of the FIFO queue.
57    pub async fn unpop(&self, data: &T) -> Result<(), ErrorTypes> {
58        self.raw.unpop(&serde_json::to_vec(data)?).await
59    }
60
61    /// Read the number of items in the queue
62    pub async fn length(&self) -> Result<usize, ErrorTypes> {
63        self.raw.length().await
64    }
65
66    /// load the item that would be returned by the next call to pop
67    pub async fn peek_next(&self) -> Result<Option<T>, ErrorTypes> {
68        Ok(match self.raw.peek_next().await? { 
69            Some(value) => Some(serde_json::from_slice(&value)?),
70            None => None
71        })
72    }
73
74    /// Load the entire content of the queue into memory
75    pub async fn content(&self) -> Result<Vec<T>, ErrorTypes> {
76        let response: Vec<Vec<u8>> = self.raw.content().await?;
77        let mut out = vec![];
78        for data in response {
79            out.push(serde_json::from_slice(&data)?);
80        }
81        Ok(out)
82    }
83
84    /// Clear all data for this object
85    pub async fn delete(&self) -> Result<(), ErrorTypes> {
86        self.raw.delete().await
87    }
88
89    /// dequeue an item from the front of the queue, returning immediately if empty
90    pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
91        Ok(match self.raw.pop().await? { 
92            Some(value) => Some(serde_json::from_slice(&value)?),
93            None => None
94        })
95    }
96
97    /// Make a blocking pop call with a timeout
98    pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<T>, ErrorTypes> {
99        Ok(match self.raw.pop_timeout(timeout).await? { 
100            Some(value) => Some(serde_json::from_slice(&value)?),
101            None => None
102        })
103    }
104
105    /// Pop as many items as possible up to a certain limit
106    pub async fn pop_batch(&self, limit: usize) -> Result<Vec<T>, ErrorTypes> {
107        let response: Vec<Vec<u8>> = self.raw.pop_batch(limit).await?;
108        let mut out = vec![];
109        for data in response {
110            out.push(serde_json::from_slice(&data)?);
111        }
112        Ok(out)
113    }
114
115    /// Wait for up to the given timeout for any of the given queues to recieve a value
116    pub async fn select(queues: &[&Queue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
117        let queues: Vec<&RawQueue> = queues.iter().map(|queue|&queue.raw).collect();
118        let response = RawQueue::select(&queues, timeout).await?;
119        Ok(match response {
120            Some((name, data)) => Some((name, serde_json::from_slice(&data)?)),
121            None => None,
122        })
123    }
124
125    /// access the untyped raw queue underlying the typed Queue object
126    pub fn raw(&self) -> RawQueue {
127        self.raw.clone()
128    }
129}
130
131/// A FIFO queue
132/// Optionally has a server side time to live
133#[derive(Clone)]
134pub struct RawQueue {
135    name: String,
136    store: Arc<RedisObjects>,
137    ttl: Option<Duration>,
138    last_expire_time: Arc<std::sync::Mutex<Option<std::time::Instant>>>,
139}
140
141impl std::fmt::Debug for RawQueue {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("RawQueue").field("name", &self.name).field("store", &self.store).finish()
144    }
145}
146
147impl RawQueue {
148    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
149        Self {
150            name,
151            store,
152            ttl,
153            last_expire_time: Arc::new(std::sync::Mutex::new(None)),
154        }
155    }
156
157    /// set the expiry on the queue if it has not been recently set
158    async fn conditional_expire(&self) -> Result<(), ErrorTypes> {
159        // load the ttl of this object has one set
160        if let Some(ttl) = self.ttl {
161            {
162                // the last expire time is behind a mutex so that the queue object is threadsafe
163                let mut last_expire_time = self.last_expire_time.lock().unwrap();
164
165                // figure out if its time to update the expiry, wait until we are 25% through the
166                // ttl to avoid resetting something only milliseconds old
167                if let Some(time) = *last_expire_time {
168                    if time.elapsed() < (ttl / 4) {
169                        return Ok(())
170                    }
171                };
172
173                // update the time in the mutex then drop it so we aren't holding the lock 
174                // while we make the call to the redis server
175                *last_expire_time = Some(std::time::Instant::now());
176            }
177            let _: () = retry_call!(self.store.pool, expire, &self.name, ttl.as_secs() as i64)?;
178        }
179        Ok(())
180    }
181
182    /// enqueue a single item
183    #[instrument(skip(data))]
184    pub async fn push(&self, data: &[u8]) -> Result<(), ErrorTypes> {
185        let _: () = retry_call!(self.store.pool, rpush, &self.name, data)?;
186        self.conditional_expire().await
187    }
188
189    /// enqueue a sequence of items
190    #[instrument(skip(data))]
191    pub async fn push_batch(&self, data: impl Iterator<Item=&[u8]>) -> Result<(), ErrorTypes> {
192        let mut pipe = redis::pipe();
193        for item in data {
194            pipe.rpush(&self.name, item);
195        }
196        let _: () = retry_call!(method, self.store.pool, pipe, query_async)?;
197        self.conditional_expire().await
198    }
199
200    /// Put all messages passed back at the head of the FIFO queue.
201    #[instrument(skip(data))]
202    pub async fn unpop(&self, data: &[u8]) -> Result<(), ErrorTypes> {
203        let _: () = retry_call!(self.store.pool, lpush, &self.name, data)?;
204        self.conditional_expire().await
205    }
206
207    /// Read the number of items in the queue
208    #[instrument]
209    pub async fn length(&self) -> Result<usize, ErrorTypes> {
210        retry_call!(self.store.pool, llen, &self.name)
211    }
212
213    /// load the item that would be returned by the next call to pop
214    #[instrument]
215    pub async fn peek_next(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
216        let response: Vec<Vec<u8>> = retry_call!(self.store.pool, lrange, &self.name, 0, 0)?;
217        Ok(response.into_iter().nth(0))
218    }
219
220    /// Load the entire content of the queue into memory
221    #[instrument]
222    pub async fn content(&self) -> Result<Vec<Vec<u8>>, ErrorTypes> {
223        Ok(retry_call!(self.store.pool, lrange, &self.name, 0, -1)?)
224    }
225
226    /// Clear all data for this object
227    #[instrument]
228    pub async fn delete(&self) -> Result<(), ErrorTypes> {
229        retry_call!(self.store.pool, del, &self.name)
230    }
231
232    /// dequeue an item from the front of the queue, returning immediately if empty
233    #[instrument]
234    pub async fn pop(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
235        Ok(retry_call!(self.store.pool, lpop, &self.name, None)?)
236    }
237
238    /// Make a blocking pop call with a timeout
239    #[instrument]
240    pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<Vec<u8>>, ErrorTypes> {
241        let response: Option<(String, Vec<u8>)> = retry_call!(self.store.pool, blpop, &self.name, timeout.as_secs_f64())?;
242        Ok(response.map(|(_, data)| data))
243    }
244
245    /// Pop as many items as possible up to a certain limit
246    #[instrument]
247    pub async fn pop_batch(&self, limit: usize) -> Result<Vec<Vec<u8>>, ErrorTypes> {
248        let limit = match NonZeroUsize::new(limit) {
249            Some(value) => value,
250            None => return Ok(Default::default()),
251        };
252        Ok(retry_call!(self.store.pool, lpop, &self.name, Some(limit))?)
253    }
254
255    /// Wait for up to the given timeout for any of the given queues to recieve a value
256    #[instrument]
257    pub async fn select(queues: &[&RawQueue], timeout: Option<Duration>) -> Result<Option<(String, Vec<u8>)>, ErrorTypes> {
258        let timeout = timeout.unwrap_or_default().as_secs_f64();
259        if queues.is_empty() {
260            return Ok(None)
261        }
262
263        let store = &queues[0].store;        
264        let mut names = vec![];
265        for queue in queues {
266            names.push(queue.name.as_str())
267        }
268        Ok(retry_call!(store.pool, blpop, &names, timeout)?)
269    }
270}
271
272/// Work around for inconsistency between ZRANGEBYSCORE and ZREMRANGEBYSCORE
273///   (No limit option available or we would just be using that directly)
274///
275/// args:
276///   minimum score to pop
277///   maximum score to pop
278///   number of elements to skip before popping any
279///   max element count to pop
280const PQ_DEQUEUE_RANGE_SCRIPT: &str = r#"
281local unpack = table.unpack or unpack
282local min_score = tonumber(ARGV[1]);
283if min_score == nil then min_score = -math.huge end
284local max_score = tonumber(ARGV[2]);
285if max_score == nil then max_score = math.huge end
286local rem_offset = tonumber(ARGV[3]);
287local rem_limit = tonumber(ARGV[4]);
288
289local entries = redis.call("zrangebyscore", KEYS[1], min_score, max_score, "limit", rem_offset, rem_limit);
290if #entries > 0 then redis.call("zrem", KEYS[1], unpack(entries)) end
291return entries
292"#;
293
294/// The length of prefixes added to the entries in the priority queue
295const SORTING_KEY_LEN: usize = 21;
296
297/// A priority queue implemented on a redis sorted set
298pub struct PriorityQueue<T> {
299    name: String,
300    store: Arc<RedisObjects>,
301    dequeue_range: redis::Script,
302    _data: PhantomData<T>,
303}
304
305impl<T> std::fmt::Debug for PriorityQueue<T> {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        f.debug_struct("PriorityQueue").field("name", &self.name).field("store", &self.store).finish()
308    }
309}
310
311impl<T: Serialize + DeserializeOwned> PriorityQueue<T> {
312    pub (crate) fn new(name: String, store: Arc<RedisObjects>) -> Self {
313        Self {
314            name,
315            store,
316            dequeue_range: redis::Script::new(PQ_DEQUEUE_RANGE_SCRIPT),
317            _data: PhantomData,
318        }
319    }
320
321    /// get key name used for this queue
322    pub fn name(&self) -> &str {
323        self.name.as_str()
324    }
325
326    fn encode(item: &T) -> Result<Vec<u8>, ErrorTypes> {
327        let vip = false;
328        let vip = if vip { 0 } else { 9 };
329        
330        let now = chrono::Utc::now().timestamp_micros();
331        let data = serde_json::to_string(&item)?;
332
333        // let value = f"{vip}{f'{int(time.time()*1000000):020}'}{json.dumps(data)}"
334        Ok(format!("{vip}{now:020}{data}").into_bytes())
335    }
336    fn decode(data: &[u8]) -> Result<T, ErrorTypes> {
337        Ok(serde_json::from_slice(&data[SORTING_KEY_LEN..])?)
338    }
339
340    /// Return the number of items within the two priority values (inclusive on both ends)
341    #[instrument]
342    pub async fn count(&self, lowest: f64, highest: f64) -> Result<u64, ErrorTypes> {
343        Ok(retry_call!(self.store.pool, zcount, &self.name, -highest, -lowest)?)
344    }
345
346    /// Remove all the data from this queue
347    #[instrument]
348    pub async fn delete(&self) -> Result<(), ErrorTypes> {
349        retry_call!(self.store.pool, del, &self.name)
350    }
351
352    /// Get the number of items in the queue
353    #[instrument]
354    pub async fn length(&self) -> Result<u64, ErrorTypes> {
355        retry_call!(self.store.pool, zcard, &self.name)
356    }
357
358    /// Remove items from the front of the queue
359    #[instrument]
360    pub async fn pop(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
361        if num <= 0 {
362            return Ok(Default::default())
363        };
364        let items: Vec<(Vec<u8>, f64)> = retry_call!(self.store.pool, zpopmin, &self.name, num)?;
365        let mut out = vec![];
366        for (data, _priority) in items {
367            out.push(Self::decode(&data)?);
368        }
369        Ok(out)
370    }
371
372    /// When only one item is requested, blocking is is possible.
373    #[instrument]
374    pub async fn blocking_pop(&self, timeout: Duration, low_priority: bool) -> Result<Option<T>, ErrorTypes> {
375        let result: Option<(String, Vec<u8>, f64)> = if low_priority {
376            retry_call!(self.store.pool, bzpopmax, &self.name, timeout.as_secs_f64())?
377        } else {
378            retry_call!(self.store.pool, bzpopmin, &self.name, timeout.as_secs_f64())?
379        };
380        match result {
381            Some(result) => Ok(Some(Self::decode(&result.1)?)),
382            None => Ok(None)
383        }
384    }
385
386//     def blocking_pop(self, timeout=0, low_priority=False):
387//         """When only one item is requested, blocking is is possible."""
388//         if low_priority:
389//             result = retry_call(self.c.bzpopmax, self.name, timeout)
390//         else:
391//             result = retry_call(self.c.bzpopmin, self.name, timeout)
392//         if result:
393//             return decode(result[1][SORTING_KEY_LEN:])
394//         return None
395
396    /// Dequeue a number of elements, within a specified range of scores.
397    /// Limits given are inclusive, can be made exclusive, see redis docs on how to format limits for that.
398    /// NOTE: lower/upper limit is negated+swapped in the lua script, no need to do it here
399    /// :param lower_limit: The score of all dequeued elements must be higher or equal to this.
400    /// :param upper_limit: The score of all dequeued elements must be lower or equal to this.
401    /// :param skip: In the range of available items to dequeue skip over this many.
402    /// :param num: Maximum number of elements to dequeue.
403    #[instrument]
404    pub async fn dequeue_range(&self, lower_limit: Option<i64>, upper_limit: Option<i64>, skip: Option<u32>, num: Option<u32>) -> Result<Vec<T>, ErrorTypes> {
405        let skip = skip.unwrap_or(0);
406        let num = num.unwrap_or(1);
407        let mut call = self.dequeue_range.key(&self.name);
408
409        let inner_lower = match upper_limit {
410            Some(value) => -value,
411            None => i64::MIN,
412        };
413        let inner_upper = match lower_limit {
414            Some(value) => -value,
415            None => i64::MAX,
416        };
417
418        let call = call.arg(inner_lower).arg(inner_upper).arg(skip).arg(num);
419        let results: Vec<Vec<u8>> = retry_call!(method, self.store.pool, call, invoke_async)?;
420        results.iter()
421            .map(|row| Self::decode(row))
422            .collect()
423        // results = retry_call(self._deque_range, keys=[self.name], args=[lower_limit, upper_limit, skip, num])
424        // return [decode(res[SORTING_KEY_LEN:]) for res in results]
425    }
426
427    /// Place an item into the queue
428    #[instrument(skip(data))]
429    pub async fn push(&self, priority: f64, data: &T) -> Result<Vec<u8>, ErrorTypes> {
430        let value = Self::encode(data)?;
431        if retry_call!(self.store.pool, zadd, &self.name, &value, -priority)? {
432            Ok(value)
433        } else {
434            Err(ErrorTypes::UnknownRedisError)
435        }
436    }
437
438    /// Given the raw encoding of an item in queue get its position
439    #[instrument(skip(raw_value))]
440    pub async fn rank(&self, raw_value: &[u8]) -> Result<Option<u64>, ErrorTypes> {
441        retry_call!(self.store.pool, zrank, &self.name, raw_value)
442    }
443
444    /// Remove a specific item from the queue based on its raw value
445    #[instrument(skip(raw_value))]
446    pub async fn remove(&self, raw_value: &[u8]) -> Result<bool, ErrorTypes> {
447        let count: i32 = retry_call!(self.store.pool, zrem, &self.name, raw_value)?;
448        Ok(count >= 1)
449    }
450
451    /// Pop items from the low priority end of the queue
452    #[instrument]
453    pub async fn unpush(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
454        if num <= 0 {
455            return Ok(Default::default())
456        };
457        let items: Vec<(Vec<u8>, i32)> = retry_call!(self.store.pool, zpopmax, &self.name, num)?;
458        let mut out = vec![];
459        for (data, _priority) in items {
460            out.push(Self::decode(&data)?);
461        }
462        Ok(out)
463    }
464
465    /// Pop the first item from any of the given queues within the given timeout
466    #[instrument]
467    pub async fn select(queues: &[&PriorityQueue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
468        if queues.is_empty() {
469            return Ok(Default::default())
470        }
471
472        let _timeout = timeout.unwrap_or_default().as_secs_f64();
473        // todo!("Waiting for deadpool-redis package to upgrade to redis-rs 0.24");
474        let mut names = vec![];
475        for queue in queues {
476            names.push(queue.name.as_str());
477        }
478        let response: Option<(String, Vec<u8>, f64)> = retry_call!(queues[0].store.pool, bzpopmin, &names, _timeout)?;
479
480        Ok(match response {
481            Some((queue, value, _)) => Some((queue, Self::decode(&value)?)),
482            None => None,
483        })
484    }
485
486    /// Utility function for batch reading queue lengths.
487    #[instrument]
488    pub async fn all_length(queues: &[&PriorityQueue<T>]) -> Result<Vec<u64>, ErrorTypes> {
489        if queues.is_empty() {
490            return Ok(Default::default())
491        }
492
493        let mut pipe = redis::pipe();
494        for que in queues {
495            pipe.zcard(&que.name);
496        }
497
498        Ok(retry_call!(method, queues[0].store.pool, pipe, query_async)?)
499    }
500
501    
502}
503
504/// Object represeting a colleciton of simple queues with the same prefix and message type 
505pub struct MultiQueue<Message: Serialize + DeserializeOwned> {
506    store: Arc<RedisObjects>,
507    prefix: String,
508    _data: PhantomData<Message>,
509}
510
511impl<Message: Serialize + DeserializeOwned> std::fmt::Debug for MultiQueue<Message> {
512    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513        f.debug_struct("MultiQueue").field("store", &self.store).field("prefix", &self.prefix).finish()
514    }
515}
516
517impl<Message: Serialize + DeserializeOwned> MultiQueue<Message> {
518    pub(crate) fn new(prefix: String, store: Arc<RedisObjects>) -> Self {
519        Self {store, prefix, _data: Default::default()}
520    }
521
522    /// Delete one of the queues
523    #[instrument]
524    pub async fn delete(&self, name: &str) -> Result<(), ErrorTypes> {
525        retry_call!(self.store.pool, del, self.prefix.clone() + name)
526    }
527
528    /// Get the length of one of the queues
529    #[instrument]
530    pub async fn length(&self, name: &str) -> Result<u64, ErrorTypes> {
531        retry_call!(self.store.pool, llen, self.prefix.clone() + name)
532    }
533
534    /// Pop from one of the queues, returning asap if no values are available.
535    #[instrument]
536    pub async fn pop_nonblocking(&self, name: &str) -> Result<Option<Message>, ErrorTypes> {
537        let result: Option<String> = retry_call!(self.store.pool, lpop, self.prefix.clone() + name, None)?;
538        match result {
539            Some(result) => Ok(serde_json::from_str(&result)?),
540            None => Ok(None)
541        }
542    }
543
544    /// Pop from one of the queues, wait up to `timeout` if no values are available.
545    #[instrument]
546    pub async fn pop(&self, name: &str, timeout: Duration) -> Result<Option<Message>, ErrorTypes> {
547        let result: Option<(String, String)> = retry_call!(self.store.pool, blpop, self.prefix.clone() + name, timeout.as_secs_f64())?;
548        match result {
549            Some((_, result)) => Ok(serde_json::from_str(&result)?),
550            None => Ok(None),
551        }
552    }
553
554    /// Insert an item into one of the queues
555    #[instrument(skip(message))]
556    pub async fn push(&self, name: &str, message: &Message) -> Result<(), ErrorTypes> {
557        let _: () = retry_call!(self.store.pool, rpush, self.prefix.clone() + name, serde_json::to_string(message)?)?;
558        Ok(())
559    }
560}