1use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::AsyncCommands;
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use tracing::instrument;
12
13use crate::ErrorTypes;
14
15use super::{RedisObjects, retry_call};
16
17pub struct Queue<T: Serialize + DeserializeOwned> {
20 raw: RawQueue,
21 _data: PhantomData<T>
22}
23
24impl<T: Serialize + DeserializeOwned> Clone for Queue<T> {
25 fn clone(&self) -> Self {
26 Self { raw: self.raw.clone(), _data: self._data }
27 }
28}
29
30impl<T: Serialize + DeserializeOwned> Queue<T> {
31 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
32 Self {
33 raw: RawQueue::new(name, store, ttl),
34 _data: PhantomData,
35 }
36 }
37
38 pub fn host(&self) -> Arc<RedisObjects> {
40 self.raw.store.clone()
41 }
42
43 pub async fn push(&self, data: &T) -> Result<(), ErrorTypes> {
45 self.raw.push(&serde_json::to_vec(data)?).await
46 }
47
48 pub async fn push_batch(&self, data: &[T]) -> Result<(), ErrorTypes> {
50 let data: Result<Vec<Vec<u8>>, _> = data.iter()
51 .map(|item | serde_json::to_vec(item))
52 .collect();
53 self.raw.push_batch(data?.iter().map(|item| item.as_slice())).await
54 }
55
56 pub async fn unpop(&self, data: &T) -> Result<(), ErrorTypes> {
58 self.raw.unpop(&serde_json::to_vec(data)?).await
59 }
60
61 pub async fn length(&self) -> Result<usize, ErrorTypes> {
63 self.raw.length().await
64 }
65
66 pub async fn peek_next(&self) -> Result<Option<T>, ErrorTypes> {
68 Ok(match self.raw.peek_next().await? {
69 Some(value) => Some(serde_json::from_slice(&value)?),
70 None => None
71 })
72 }
73
74 pub async fn content(&self) -> Result<Vec<T>, ErrorTypes> {
76 let response: Vec<Vec<u8>> = self.raw.content().await?;
77 let mut out = vec![];
78 for data in response {
79 out.push(serde_json::from_slice(&data)?);
80 }
81 Ok(out)
82 }
83
84 pub async fn delete(&self) -> Result<(), ErrorTypes> {
86 self.raw.delete().await
87 }
88
89 pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
91 Ok(match self.raw.pop().await? {
92 Some(value) => Some(serde_json::from_slice(&value)?),
93 None => None
94 })
95 }
96
97 pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<T>, ErrorTypes> {
99 Ok(match self.raw.pop_timeout(timeout).await? {
100 Some(value) => Some(serde_json::from_slice(&value)?),
101 None => None
102 })
103 }
104
105 pub async fn pop_batch(&self, limit: usize) -> Result<Vec<T>, ErrorTypes> {
107 let response: Vec<Vec<u8>> = self.raw.pop_batch(limit).await?;
108 let mut out = vec![];
109 for data in response {
110 out.push(serde_json::from_slice(&data)?);
111 }
112 Ok(out)
113 }
114
115 pub async fn select(queues: &[&Queue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
117 let queues: Vec<&RawQueue> = queues.iter().map(|queue|&queue.raw).collect();
118 let response = RawQueue::select(&queues, timeout).await?;
119 Ok(match response {
120 Some((name, data)) => Some((name, serde_json::from_slice(&data)?)),
121 None => None,
122 })
123 }
124
125 pub fn raw(&self) -> RawQueue {
127 self.raw.clone()
128 }
129}
130
131#[derive(Clone)]
134pub struct RawQueue {
135 name: String,
136 store: Arc<RedisObjects>,
137 ttl: Option<Duration>,
138 last_expire_time: Arc<std::sync::Mutex<Option<std::time::Instant>>>,
139}
140
141impl std::fmt::Debug for RawQueue {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("RawQueue").field("name", &self.name).field("store", &self.store).finish()
144 }
145}
146
147impl RawQueue {
148 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
149 Self {
150 name,
151 store,
152 ttl,
153 last_expire_time: Arc::new(std::sync::Mutex::new(None)),
154 }
155 }
156
157 async fn conditional_expire(&self) -> Result<(), ErrorTypes> {
159 if let Some(ttl) = self.ttl {
161 {
162 let mut last_expire_time = self.last_expire_time.lock().unwrap();
164
165 if let Some(time) = *last_expire_time {
168 if time.elapsed() < (ttl / 4) {
169 return Ok(())
170 }
171 };
172
173 *last_expire_time = Some(std::time::Instant::now());
176 }
177 let _: () = retry_call!(self.store.pool, expire, &self.name, ttl.as_secs() as i64)?;
178 }
179 Ok(())
180 }
181
182 #[instrument(skip(data))]
184 pub async fn push(&self, data: &[u8]) -> Result<(), ErrorTypes> {
185 let _: () = retry_call!(self.store.pool, rpush, &self.name, data)?;
186 self.conditional_expire().await
187 }
188
189 #[instrument(skip(data))]
191 pub async fn push_batch(&self, data: impl Iterator<Item=&[u8]>) -> Result<(), ErrorTypes> {
192 let mut pipe = redis::pipe();
193 for item in data {
194 pipe.rpush(&self.name, item);
195 }
196 let _: () = retry_call!(method, self.store.pool, pipe, query_async)?;
197 self.conditional_expire().await
198 }
199
200 #[instrument(skip(data))]
202 pub async fn unpop(&self, data: &[u8]) -> Result<(), ErrorTypes> {
203 let _: () = retry_call!(self.store.pool, lpush, &self.name, data)?;
204 self.conditional_expire().await
205 }
206
207 #[instrument]
209 pub async fn length(&self) -> Result<usize, ErrorTypes> {
210 retry_call!(self.store.pool, llen, &self.name)
211 }
212
213 #[instrument]
215 pub async fn peek_next(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
216 let response: Vec<Vec<u8>> = retry_call!(self.store.pool, lrange, &self.name, 0, 0)?;
217 Ok(response.into_iter().nth(0))
218 }
219
220 #[instrument]
222 pub async fn content(&self) -> Result<Vec<Vec<u8>>, ErrorTypes> {
223 Ok(retry_call!(self.store.pool, lrange, &self.name, 0, -1)?)
224 }
225
226 #[instrument]
228 pub async fn delete(&self) -> Result<(), ErrorTypes> {
229 retry_call!(self.store.pool, del, &self.name)
230 }
231
232 #[instrument]
234 pub async fn pop(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
235 Ok(retry_call!(self.store.pool, lpop, &self.name, None)?)
236 }
237
238 #[instrument]
240 pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<Vec<u8>>, ErrorTypes> {
241 let response: Option<(String, Vec<u8>)> = retry_call!(self.store.pool, blpop, &self.name, timeout.as_secs_f64())?;
242 Ok(response.map(|(_, data)| data))
243 }
244
245 #[instrument]
247 pub async fn pop_batch(&self, limit: usize) -> Result<Vec<Vec<u8>>, ErrorTypes> {
248 let limit = match NonZeroUsize::new(limit) {
249 Some(value) => value,
250 None => return Ok(Default::default()),
251 };
252 Ok(retry_call!(self.store.pool, lpop, &self.name, Some(limit))?)
253 }
254
255 #[instrument]
257 pub async fn select(queues: &[&RawQueue], timeout: Option<Duration>) -> Result<Option<(String, Vec<u8>)>, ErrorTypes> {
258 let timeout = timeout.unwrap_or_default().as_secs_f64();
259 if queues.is_empty() {
260 return Ok(None)
261 }
262
263 let store = &queues[0].store;
264 let mut names = vec![];
265 for queue in queues {
266 names.push(queue.name.as_str())
267 }
268 Ok(retry_call!(store.pool, blpop, &names, timeout)?)
269 }
270}
271
272const PQ_DEQUEUE_RANGE_SCRIPT: &str = r#"
281local unpack = table.unpack or unpack
282local min_score = tonumber(ARGV[1]);
283if min_score == nil then min_score = -math.huge end
284local max_score = tonumber(ARGV[2]);
285if max_score == nil then max_score = math.huge end
286local rem_offset = tonumber(ARGV[3]);
287local rem_limit = tonumber(ARGV[4]);
288
289local entries = redis.call("zrangebyscore", KEYS[1], min_score, max_score, "limit", rem_offset, rem_limit);
290if #entries > 0 then redis.call("zrem", KEYS[1], unpack(entries)) end
291return entries
292"#;
293
294const SORTING_KEY_LEN: usize = 21;
296
297pub struct PriorityQueue<T> {
299 name: String,
300 store: Arc<RedisObjects>,
301 dequeue_range: redis::Script,
302 _data: PhantomData<T>,
303}
304
305impl<T> std::fmt::Debug for PriorityQueue<T> {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 f.debug_struct("PriorityQueue").field("name", &self.name).field("store", &self.store).finish()
308 }
309}
310
311impl<T: Serialize + DeserializeOwned> PriorityQueue<T> {
312 pub (crate) fn new(name: String, store: Arc<RedisObjects>) -> Self {
313 Self {
314 name,
315 store,
316 dequeue_range: redis::Script::new(PQ_DEQUEUE_RANGE_SCRIPT),
317 _data: PhantomData,
318 }
319 }
320
321 pub fn name(&self) -> &str {
323 self.name.as_str()
324 }
325
326 fn encode(item: &T) -> Result<Vec<u8>, ErrorTypes> {
327 let vip = false;
328 let vip = if vip { 0 } else { 9 };
329
330 let now = chrono::Utc::now().timestamp_micros();
331 let data = serde_json::to_string(&item)?;
332
333 Ok(format!("{vip}{now:020}{data}").into_bytes())
335 }
336 fn decode(data: &[u8]) -> Result<T, ErrorTypes> {
337 Ok(serde_json::from_slice(&data[SORTING_KEY_LEN..])?)
338 }
339
340 #[instrument]
342 pub async fn count(&self, lowest: f64, highest: f64) -> Result<u64, ErrorTypes> {
343 Ok(retry_call!(self.store.pool, zcount, &self.name, -highest, -lowest)?)
344 }
345
346 #[instrument]
348 pub async fn delete(&self) -> Result<(), ErrorTypes> {
349 retry_call!(self.store.pool, del, &self.name)
350 }
351
352 #[instrument]
354 pub async fn length(&self) -> Result<u64, ErrorTypes> {
355 retry_call!(self.store.pool, zcard, &self.name)
356 }
357
358 #[instrument]
360 pub async fn pop(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
361 if num <= 0 {
362 return Ok(Default::default())
363 };
364 let items: Vec<(Vec<u8>, f64)> = retry_call!(self.store.pool, zpopmin, &self.name, num)?;
365 let mut out = vec![];
366 for (data, _priority) in items {
367 out.push(Self::decode(&data)?);
368 }
369 Ok(out)
370 }
371
372 #[instrument]
374 pub async fn blocking_pop(&self, timeout: Duration, low_priority: bool) -> Result<Option<T>, ErrorTypes> {
375 let result: Option<(String, Vec<u8>, f64)> = if low_priority {
376 retry_call!(self.store.pool, bzpopmax, &self.name, timeout.as_secs_f64())?
377 } else {
378 retry_call!(self.store.pool, bzpopmin, &self.name, timeout.as_secs_f64())?
379 };
380 match result {
381 Some(result) => Ok(Some(Self::decode(&result.1)?)),
382 None => Ok(None)
383 }
384 }
385
386#[instrument]
404 pub async fn dequeue_range(&self, lower_limit: Option<i64>, upper_limit: Option<i64>, skip: Option<u32>, num: Option<u32>) -> Result<Vec<T>, ErrorTypes> {
405 let skip = skip.unwrap_or(0);
406 let num = num.unwrap_or(1);
407 let mut call = self.dequeue_range.key(&self.name);
408
409 let inner_lower = match upper_limit {
410 Some(value) => -value,
411 None => i64::MIN,
412 };
413 let inner_upper = match lower_limit {
414 Some(value) => -value,
415 None => i64::MAX,
416 };
417
418 let call = call.arg(inner_lower).arg(inner_upper).arg(skip).arg(num);
419 let results: Vec<Vec<u8>> = retry_call!(method, self.store.pool, call, invoke_async)?;
420 results.iter()
421 .map(|row| Self::decode(row))
422 .collect()
423 }
426
427 #[instrument(skip(data))]
429 pub async fn push(&self, priority: f64, data: &T) -> Result<Vec<u8>, ErrorTypes> {
430 let value = Self::encode(data)?;
431 if retry_call!(self.store.pool, zadd, &self.name, &value, -priority)? {
432 Ok(value)
433 } else {
434 Err(ErrorTypes::UnknownRedisError)
435 }
436 }
437
438 #[instrument(skip(raw_value))]
440 pub async fn rank(&self, raw_value: &[u8]) -> Result<Option<u64>, ErrorTypes> {
441 retry_call!(self.store.pool, zrank, &self.name, raw_value)
442 }
443
444 #[instrument(skip(raw_value))]
446 pub async fn remove(&self, raw_value: &[u8]) -> Result<bool, ErrorTypes> {
447 let count: i32 = retry_call!(self.store.pool, zrem, &self.name, raw_value)?;
448 Ok(count >= 1)
449 }
450
451 #[instrument]
453 pub async fn unpush(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
454 if num <= 0 {
455 return Ok(Default::default())
456 };
457 let items: Vec<(Vec<u8>, i32)> = retry_call!(self.store.pool, zpopmax, &self.name, num)?;
458 let mut out = vec![];
459 for (data, _priority) in items {
460 out.push(Self::decode(&data)?);
461 }
462 Ok(out)
463 }
464
465 #[instrument]
467 pub async fn select(queues: &[&PriorityQueue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
468 if queues.is_empty() {
469 return Ok(Default::default())
470 }
471
472 let _timeout = timeout.unwrap_or_default().as_secs_f64();
473 let mut names = vec![];
475 for queue in queues {
476 names.push(queue.name.as_str());
477 }
478 let response: Option<(String, Vec<u8>, f64)> = retry_call!(queues[0].store.pool, bzpopmin, &names, _timeout)?;
479
480 Ok(match response {
481 Some((queue, value, _)) => Some((queue, Self::decode(&value)?)),
482 None => None,
483 })
484 }
485
486 #[instrument]
488 pub async fn all_length(queues: &[&PriorityQueue<T>]) -> Result<Vec<u64>, ErrorTypes> {
489 if queues.is_empty() {
490 return Ok(Default::default())
491 }
492
493 let mut pipe = redis::pipe();
494 for que in queues {
495 pipe.zcard(&que.name);
496 }
497
498 Ok(retry_call!(method, queues[0].store.pool, pipe, query_async)?)
499 }
500
501
502}
503
504pub struct MultiQueue<Message: Serialize + DeserializeOwned> {
506 store: Arc<RedisObjects>,
507 prefix: String,
508 _data: PhantomData<Message>,
509}
510
511impl<Message: Serialize + DeserializeOwned> std::fmt::Debug for MultiQueue<Message> {
512 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513 f.debug_struct("MultiQueue").field("store", &self.store).field("prefix", &self.prefix).finish()
514 }
515}
516
517impl<Message: Serialize + DeserializeOwned> MultiQueue<Message> {
518 pub(crate) fn new(prefix: String, store: Arc<RedisObjects>) -> Self {
519 Self {store, prefix, _data: Default::default()}
520 }
521
522 #[instrument]
524 pub async fn delete(&self, name: &str) -> Result<(), ErrorTypes> {
525 retry_call!(self.store.pool, del, self.prefix.clone() + name)
526 }
527
528 #[instrument]
530 pub async fn length(&self, name: &str) -> Result<u64, ErrorTypes> {
531 retry_call!(self.store.pool, llen, self.prefix.clone() + name)
532 }
533
534 #[instrument]
536 pub async fn pop_nonblocking(&self, name: &str) -> Result<Option<Message>, ErrorTypes> {
537 let result: Option<String> = retry_call!(self.store.pool, lpop, self.prefix.clone() + name, None)?;
538 match result {
539 Some(result) => Ok(serde_json::from_str(&result)?),
540 None => Ok(None)
541 }
542 }
543
544 #[instrument]
546 pub async fn pop(&self, name: &str, timeout: Duration) -> Result<Option<Message>, ErrorTypes> {
547 let result: Option<(String, String)> = retry_call!(self.store.pool, blpop, self.prefix.clone() + name, timeout.as_secs_f64())?;
548 match result {
549 Some((_, result)) => Ok(serde_json::from_str(&result)?),
550 None => Ok(None),
551 }
552 }
553
554 #[instrument(skip(message))]
556 pub async fn push(&self, name: &str, message: &Message) -> Result<(), ErrorTypes> {
557 let _: () = retry_call!(self.store.pool, rpush, self.prefix.clone() + name, serde_json::to_string(message)?)?;
558 Ok(())
559 }
560}