redis_objects/
queue.rs

1//! Objects for using lists and sorted sets in redis as queues.
2
3use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::AsyncCommands;
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11
12use crate::ErrorTypes;
13
14use super::{RedisObjects, retry_call};
15
16/// A FIFO queue
17/// Optionally has a server side time to live
18pub struct Queue<T: Serialize + DeserializeOwned> {
19    raw: RawQueue,
20    _data: PhantomData<T>
21}
22
23impl<T: Serialize + DeserializeOwned> Clone for Queue<T> {
24    fn clone(&self) -> Self {
25        Self { raw: self.raw.clone(), _data: self._data }
26    }
27}
28
29impl<T: Serialize + DeserializeOwned> Queue<T> {
30    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
31        Self {
32            raw: RawQueue::new(name, store, ttl),
33            _data: PhantomData,
34        }
35    }
36
37    /// Get a reference to the server/object collection holding this queue
38    pub fn host(&self) -> Arc<RedisObjects> {
39        self.raw.store.clone()
40    }
41
42    /// enqueue a single item
43    pub async fn push(&self, data: &T) -> Result<(), ErrorTypes> {
44        self.raw.push(&serde_json::to_vec(data)?).await
45    }
46
47    /// enqueue a sequence of items
48    pub async fn push_batch(&self, data: &[T]) -> Result<(), ErrorTypes> {
49        let data: Result<Vec<Vec<u8>>, _> = data.iter()
50            .map(|item | serde_json::to_vec(item))
51            .collect();
52        self.raw.push_batch(data?.iter().map(|item| item.as_slice())).await
53    }
54
55    /// Put all messages passed back at the head of the FIFO queue.
56    pub async fn unpop(&self, data: &T) -> Result<(), ErrorTypes> {
57        self.raw.unpop(&serde_json::to_vec(data)?).await
58    }
59
60    /// Read the number of items in the queue
61    pub async fn length(&self) -> Result<usize, ErrorTypes> {
62        self.raw.length().await
63    }
64
65    /// load the item that would be returned by the next call to pop
66    pub async fn peek_next(&self) -> Result<Option<T>, ErrorTypes> {
67        Ok(match self.raw.peek_next().await? { 
68            Some(value) => Some(serde_json::from_slice(&value)?),
69            None => None
70        })
71    }
72
73    /// Load the entire content of the queue into memory
74    pub async fn content(&self) -> Result<Vec<T>, ErrorTypes> {
75        let response: Vec<Vec<u8>> = self.raw.content().await?;
76        let mut out = vec![];
77        for data in response {
78            out.push(serde_json::from_slice(&data)?);
79        }
80        Ok(out)
81    }
82
83    /// Clear all data for this object
84    pub async fn delete(&self) -> Result<(), ErrorTypes> {
85        self.raw.delete().await
86    }
87
88    /// dequeue an item from the front of the queue, returning immediately if empty
89    pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
90        Ok(match self.raw.pop().await? { 
91            Some(value) => Some(serde_json::from_slice(&value)?),
92            None => None
93        })
94    }
95
96    /// Make a blocking pop call with a timeout
97    pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<T>, ErrorTypes> {
98        Ok(match self.raw.pop_timeout(timeout).await? { 
99            Some(value) => Some(serde_json::from_slice(&value)?),
100            None => None
101        })
102    }
103
104    /// Pop as many items as possible up to a certain limit
105    pub async fn pop_batch(&self, limit: usize) -> Result<Vec<T>, ErrorTypes> {
106        let response: Vec<Vec<u8>> = self.raw.pop_batch(limit).await?;
107        let mut out = vec![];
108        for data in response {
109            out.push(serde_json::from_slice(&data)?);
110        }
111        Ok(out)
112    }
113
114    /// Wait for up to the given timeout for any of the given queues to recieve a value
115    pub async fn select(queues: &[&Queue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
116        let queues: Vec<&RawQueue> = queues.iter().map(|queue|&queue.raw).collect();
117        let response = RawQueue::select(&queues, timeout).await?;
118        Ok(match response {
119            Some((name, data)) => Some((name, serde_json::from_slice(&data)?)),
120            None => None,
121        })
122    }
123
124    /// access the untyped raw queue underlying the typed Queue object
125    pub fn raw(&self) -> RawQueue {
126        self.raw.clone()
127    }
128}
129
130/// A FIFO queue
131/// Optionally has a server side time to live
132#[derive(Clone)]
133pub struct RawQueue {
134    name: String,
135    store: Arc<RedisObjects>,
136    ttl: Option<Duration>,
137    last_expire_time: Arc<std::sync::Mutex<Option<std::time::Instant>>>,
138}
139
140impl RawQueue {
141    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
142        Self {
143            name,
144            store,
145            ttl,
146            last_expire_time: Arc::new(std::sync::Mutex::new(None)),
147        }
148    }
149
150    /// set the expiry on the queue if it has not been recently set
151    async fn conditional_expire(&self) -> Result<(), ErrorTypes> {
152        // load the ttl of this object has one set
153        if let Some(ttl) = self.ttl {
154            {
155                // the last expire time is behind a mutex so that the queue object is threadsafe
156                let mut last_expire_time = self.last_expire_time.lock().unwrap();
157
158                // figure out if its time to update the expiry, wait until we are 25% through the
159                // ttl to avoid resetting something only milliseconds old
160                if let Some(time) = *last_expire_time {
161                    if time.elapsed() < (ttl / 4) {
162                        return Ok(())
163                    }
164                };
165
166                // update the time in the mutex then drop it so we aren't holding the lock 
167                // while we make the call to the redis server
168                *last_expire_time = Some(std::time::Instant::now());
169            }
170            let _: () = retry_call!(self.store.pool, expire, &self.name, ttl.as_secs() as i64)?;
171        }
172        Ok(())
173    }
174
175    /// enqueue a single item
176    pub async fn push(&self, data: &[u8]) -> Result<(), ErrorTypes> {
177        let _: () = retry_call!(self.store.pool, rpush, &self.name, data)?;
178        self.conditional_expire().await
179    }
180
181    /// enqueue a sequence of items
182    pub async fn push_batch(&self, data: impl Iterator<Item=&[u8]>) -> Result<(), ErrorTypes> {
183        let mut pipe = redis::pipe();
184        for item in data {
185            pipe.rpush(&self.name, item);
186        }
187        let _: () = retry_call!(method, self.store.pool, pipe, query_async)?;
188        self.conditional_expire().await
189    }
190
191    /// Put all messages passed back at the head of the FIFO queue.
192    pub async fn unpop(&self, data: &[u8]) -> Result<(), ErrorTypes> {
193        let _: () = retry_call!(self.store.pool, lpush, &self.name, data)?;
194        self.conditional_expire().await
195    }
196
197    /// Read the number of items in the queue
198    pub async fn length(&self) -> Result<usize, ErrorTypes> {
199        retry_call!(self.store.pool, llen, &self.name)
200    }
201
202    /// load the item that would be returned by the next call to pop
203    pub async fn peek_next(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
204        let response: Vec<Vec<u8>> = retry_call!(self.store.pool, lrange, &self.name, 0, 0)?;
205        Ok(response.into_iter().nth(0))
206    }
207
208    /// Load the entire content of the queue into memory
209    pub async fn content(&self) -> Result<Vec<Vec<u8>>, ErrorTypes> {
210        Ok(retry_call!(self.store.pool, lrange, &self.name, 0, -1)?)
211    }
212
213    /// Clear all data for this object
214    pub async fn delete(&self) -> Result<(), ErrorTypes> {
215        retry_call!(self.store.pool, del, &self.name)
216    }
217
218    /// dequeue an item from the front of the queue, returning immediately if empty
219    pub async fn pop(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
220        Ok(retry_call!(self.store.pool, lpop, &self.name, None)?)
221    }
222
223    /// Make a blocking pop call with a timeout
224    pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<Vec<u8>>, ErrorTypes> {
225        let response: Option<(String, Vec<u8>)> = retry_call!(self.store.pool, blpop, &self.name, timeout.as_secs_f64())?;
226        Ok(response.map(|(_, data)| data))
227    }
228
229    /// Pop as many items as possible up to a certain limit
230    pub async fn pop_batch(&self, limit: usize) -> Result<Vec<Vec<u8>>, ErrorTypes> {
231        let limit = match NonZeroUsize::new(limit) {
232            Some(value) => value,
233            None => return Ok(Default::default()),
234        };
235        Ok(retry_call!(self.store.pool, lpop, &self.name, Some(limit))?)
236    }
237
238    /// Wait for up to the given timeout for any of the given queues to recieve a value
239    pub async fn select(queues: &[&RawQueue], timeout: Option<Duration>) -> Result<Option<(String, Vec<u8>)>, ErrorTypes> {
240        let timeout = timeout.unwrap_or_default().as_secs_f64();
241        if queues.is_empty() {
242            return Ok(None)
243        }
244
245        let store = &queues[0].store;        
246        let mut names = vec![];
247        for queue in queues {
248            names.push(queue.name.as_str())
249        }
250        Ok(retry_call!(store.pool, blpop, &names, timeout)?)
251    }
252}
253
254/// Work around for inconsistency between ZRANGEBYSCORE and ZREMRANGEBYSCORE
255///   (No limit option available or we would just be using that directly)
256///
257/// args:
258///   minimum score to pop
259///   maximum score to pop
260///   number of elements to skip before popping any
261///   max element count to pop
262const PQ_DEQUEUE_RANGE_SCRIPT: &str = r#"
263local unpack = table.unpack or unpack
264local min_score = tonumber(ARGV[1]);
265if min_score == nil then min_score = -math.huge end
266local max_score = tonumber(ARGV[2]);
267if max_score == nil then max_score = math.huge end
268local rem_offset = tonumber(ARGV[3]);
269local rem_limit = tonumber(ARGV[4]);
270
271local entries = redis.call("zrangebyscore", KEYS[1], min_score, max_score, "limit", rem_offset, rem_limit);
272if #entries > 0 then redis.call("zrem", KEYS[1], unpack(entries)) end
273return entries
274"#;
275
276/// The length of prefixes added to the entries in the priority queue
277const SORTING_KEY_LEN: usize = 21;
278
279/// A priority queue implemented on a redis sorted set
280pub struct PriorityQueue<T> {
281    name: String,
282    store: Arc<RedisObjects>,
283    dequeue_range: redis::Script,
284    _data: PhantomData<T>,
285}
286
287impl<T: Serialize + DeserializeOwned> PriorityQueue<T> {
288    pub (crate) fn new(name: String, store: Arc<RedisObjects>) -> Self {
289        Self {
290            name,
291            store,
292            dequeue_range: redis::Script::new(PQ_DEQUEUE_RANGE_SCRIPT),
293            _data: PhantomData,
294        }
295    }
296
297    /// get key name used for this queue
298    pub fn name(&self) -> &str {
299        self.name.as_str()
300    }
301
302    fn encode(item: &T) -> Result<Vec<u8>, ErrorTypes> {
303        let vip = false;
304        let vip = if vip { 0 } else { 9 };
305        
306        let now = chrono::Utc::now().timestamp_micros();
307        let data = serde_json::to_string(&item)?;
308
309        // let value = f"{vip}{f'{int(time.time()*1000000):020}'}{json.dumps(data)}"
310        Ok(format!("{vip}{now:020}{data}").into_bytes())
311    }
312    fn decode(data: &[u8]) -> Result<T, ErrorTypes> {
313        Ok(serde_json::from_slice(&data[SORTING_KEY_LEN..])?)
314    }
315
316    /// Return the number of items within the two priority values (inclusive on both ends)
317    pub async fn count(&self, lowest: f64, highest: f64) -> Result<u64, ErrorTypes> {
318        Ok(retry_call!(self.store.pool, zcount, &self.name, -highest, -lowest)?)
319    }
320
321    /// Remove all the data from this queue
322    pub async fn delete(&self) -> Result<(), ErrorTypes> {
323        retry_call!(self.store.pool, del, &self.name)
324    }
325
326    /// Get the number of items in the queue
327    pub async fn length(&self) -> Result<u64, ErrorTypes> {
328        retry_call!(self.store.pool, zcard, &self.name)
329    }
330
331    /// Remove items from the front of the queue
332    pub async fn pop(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
333        if num <= 0 {
334            return Ok(Default::default())
335        };
336        let items: Vec<(Vec<u8>, f64)> = retry_call!(self.store.pool, zpopmin, &self.name, num)?;
337        let mut out = vec![];
338        for (data, _priority) in items {
339            out.push(Self::decode(&data)?);
340        }
341        Ok(out)
342    }
343
344    /// When only one item is requested, blocking is is possible.
345    pub async fn blocking_pop(&self, timeout: Duration, low_priority: bool) -> Result<Option<T>, ErrorTypes> {
346        let result: Option<(String, Vec<u8>, f64)> = if low_priority {
347            retry_call!(self.store.pool, bzpopmax, &self.name, timeout.as_secs_f64())?
348        } else {
349            retry_call!(self.store.pool, bzpopmin, &self.name, timeout.as_secs_f64())?
350        };
351        match result {
352            Some(result) => Ok(Some(Self::decode(&result.1)?)),
353            None => Ok(None)
354        }
355    }
356
357//     def blocking_pop(self, timeout=0, low_priority=False):
358//         """When only one item is requested, blocking is is possible."""
359//         if low_priority:
360//             result = retry_call(self.c.bzpopmax, self.name, timeout)
361//         else:
362//             result = retry_call(self.c.bzpopmin, self.name, timeout)
363//         if result:
364//             return decode(result[1][SORTING_KEY_LEN:])
365//         return None
366
367    /// Dequeue a number of elements, within a specified range of scores.
368    /// Limits given are inclusive, can be made exclusive, see redis docs on how to format limits for that.
369    /// NOTE: lower/upper limit is negated+swapped in the lua script, no need to do it here
370    /// :param lower_limit: The score of all dequeued elements must be higher or equal to this.
371    /// :param upper_limit: The score of all dequeued elements must be lower or equal to this.
372    /// :param skip: In the range of available items to dequeue skip over this many.
373    /// :param num: Maximum number of elements to dequeue.
374    pub async fn dequeue_range(&self, lower_limit: Option<i64>, upper_limit: Option<i64>, skip: Option<u32>, num: Option<u32>) -> Result<Vec<T>, ErrorTypes> {
375        let skip = skip.unwrap_or(0);
376        let num = num.unwrap_or(1);
377        let mut call = self.dequeue_range.key(&self.name);
378
379        let inner_lower = match upper_limit {
380            Some(value) => -value,
381            None => i64::MIN,
382        };
383        let inner_upper = match lower_limit {
384            Some(value) => -value,
385            None => i64::MAX,
386        };
387
388        let call = call.arg(inner_lower).arg(inner_upper).arg(skip).arg(num);
389        let results: Vec<Vec<u8>> = retry_call!(method, self.store.pool, call, invoke_async)?;
390        results.iter()
391            .map(|row| Self::decode(row))
392            .collect()
393        // results = retry_call(self._deque_range, keys=[self.name], args=[lower_limit, upper_limit, skip, num])
394        // return [decode(res[SORTING_KEY_LEN:]) for res in results]
395    }
396
397    /// Place an item into the queue
398    pub async fn push(&self, priority: f64, data: &T) -> Result<Vec<u8>, ErrorTypes> {
399        let value = Self::encode(data)?;
400        if retry_call!(self.store.pool, zadd, &self.name, &value, -priority)? {
401            Ok(value)
402        } else {
403            Err(ErrorTypes::UnknownRedisError)
404        }
405    }
406
407    /// Given the raw encoding of an item in queue get its position
408    pub async fn rank(&self, raw_value: &[u8]) -> Result<Option<u64>, ErrorTypes> {
409        retry_call!(self.store.pool, zrank, &self.name, raw_value)
410    }
411
412    /// Remove a specific item from the queue based on its raw value
413    pub async fn remove(&self, raw_value: &[u8]) -> Result<bool, ErrorTypes> {
414        let count: i32 = retry_call!(self.store.pool, zrem, &self.name, raw_value)?;
415        Ok(count >= 1)
416    }
417
418    /// Pop items from the low priority end of the queue
419    pub async fn unpush(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
420        if num <= 0 {
421            return Ok(Default::default())
422        };
423        let items: Vec<(Vec<u8>, i32)> = retry_call!(self.store.pool, zpopmax, &self.name, num)?;
424        let mut out = vec![];
425        for (data, _priority) in items {
426            out.push(Self::decode(&data)?);
427        }
428        Ok(out)
429    }
430
431    /// Pop the first item from any of the given queues within the given timeout
432    pub async fn select(queues: &[&PriorityQueue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
433        if queues.is_empty() {
434            return Ok(Default::default())
435        }
436
437        let _timeout = timeout.unwrap_or_default().as_secs_f64();
438        // todo!("Waiting for deadpool-redis package to upgrade to redis-rs 0.24");
439        let mut names = vec![];
440        for queue in queues {
441            names.push(queue.name.as_str());
442        }
443        let response: Option<(String, Vec<u8>, f64)> = retry_call!(queues[0].store.pool, bzpopmin, &names, _timeout)?;
444
445        Ok(match response {
446            Some((queue, value, _)) => Some((queue, Self::decode(&value)?)),
447            None => None,
448        })
449    }
450
451    /// Utility function for batch reading queue lengths.
452    pub async fn all_length(queues: &[&PriorityQueue<T>]) -> Result<Vec<u64>, ErrorTypes> {
453        if queues.is_empty() {
454            return Ok(Default::default())
455        }
456
457        let mut pipe = redis::pipe();
458        for que in queues {
459            pipe.zcard(&que.name);
460        }
461
462        Ok(retry_call!(method, queues[0].store.pool, pipe, query_async)?)
463    }
464
465    
466}
467
468/// Object represeting a colleciton of simple queues with the same prefix and message type 
469pub struct MultiQueue<Message: Serialize + DeserializeOwned> {
470    store: Arc<RedisObjects>,
471    prefix: String,
472    _data: PhantomData<Message>,
473}
474
475impl<Message: Serialize + DeserializeOwned> MultiQueue<Message> {
476    pub(crate) fn new(prefix: String, store: Arc<RedisObjects>) -> Self {
477        Self {store, prefix, _data: Default::default()}
478    }
479
480    /// Delete one of the queues
481    pub async fn delete(&self, name: &str) -> Result<(), ErrorTypes> {
482        retry_call!(self.store.pool, del, self.prefix.clone() + name)
483    }
484
485    /// Get the length of one of the queues
486    pub async fn length(&self, name: &str) -> Result<u64, ErrorTypes> {
487        retry_call!(self.store.pool, llen, self.prefix.clone() + name)
488    }
489
490    /// Pop from one of the queues, returning asap if no values are available.
491    pub async fn pop_nonblocking(&self, name: &str) -> Result<Option<Message>, ErrorTypes> {
492        let result: Option<String> = retry_call!(self.store.pool, lpop, self.prefix.clone() + name, None)?;
493        match result {
494            Some(result) => Ok(serde_json::from_str(&result)?),
495            None => Ok(None)
496        }
497    }
498
499    /// Pop from one of the queues, wait up to `timeout` if no values are available.
500    pub async fn pop(&self, name: &str, timeout: Duration) -> Result<Option<Message>, ErrorTypes> {
501        let result: Option<(String, String)> = retry_call!(self.store.pool, blpop, self.prefix.clone() + name, timeout.as_secs_f64())?;
502        match result {
503            Some((_, result)) => Ok(serde_json::from_str(&result)?),
504            None => Ok(None),
505        }
506    }
507
508    /// Insert an item into one of the queues
509    pub async fn push(&self, name: &str, message: &Message) -> Result<(), ErrorTypes> {
510        let _: () = retry_call!(self.store.pool, rpush, self.prefix.clone() + name, serde_json::to_string(message)?)?;
511        Ok(())
512    }
513}