1use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::AsyncCommands;
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use tracing::instrument;
12
13use crate::ErrorTypes;
14
15use super::{RedisObjects, retry_call};
16
17pub struct Queue<T: Serialize + DeserializeOwned> {
20 raw: RawQueue,
21 _data: PhantomData<T>
22}
23
24impl<T: Serialize + DeserializeOwned> std::fmt::Debug for Queue<T> {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("Queue").field("name", &self.raw.name).finish()
27 }
28}
29
30impl<T: Serialize + DeserializeOwned> Clone for Queue<T> {
31 fn clone(&self) -> Self {
32 Self { raw: self.raw.clone(), _data: self._data }
33 }
34}
35
36impl<T: Serialize + DeserializeOwned> Queue<T> {
37 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
38 Self {
39 raw: RawQueue::new(name, store, ttl),
40 _data: PhantomData,
41 }
42 }
43
44 pub fn host(&self) -> Arc<RedisObjects> {
46 self.raw.store.clone()
47 }
48
49 pub async fn push(&self, data: &T) -> Result<(), ErrorTypes> {
51 self.raw.push(&serde_json::to_vec(data)?).await
52 }
53
54 pub async fn push_batch(&self, data: &[T]) -> Result<(), ErrorTypes> {
56 let data: Result<Vec<Vec<u8>>, _> = data.iter()
57 .map(|item | serde_json::to_vec(item))
58 .collect();
59 self.raw.push_batch(data?.iter().map(|item| item.as_slice())).await
60 }
61
62 pub async fn unpop(&self, data: &T) -> Result<(), ErrorTypes> {
64 self.raw.unpop(&serde_json::to_vec(data)?).await
65 }
66
67 pub async fn length(&self) -> Result<usize, ErrorTypes> {
69 self.raw.length().await
70 }
71
72 pub async fn peek_next(&self) -> Result<Option<T>, ErrorTypes> {
74 Ok(match self.raw.peek_next().await? {
75 Some(value) => Some(serde_json::from_slice(&value)?),
76 None => None
77 })
78 }
79
80 pub async fn content(&self) -> Result<Vec<T>, ErrorTypes> {
82 let response: Vec<Vec<u8>> = self.raw.content().await?;
83 let mut out = vec![];
84 for data in response {
85 out.push(serde_json::from_slice(&data)?);
86 }
87 Ok(out)
88 }
89
90 pub async fn delete(&self) -> Result<(), ErrorTypes> {
92 self.raw.delete().await
93 }
94
95 pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
97 Ok(match self.raw.pop().await? {
98 Some(value) => Some(serde_json::from_slice(&value)?),
99 None => None
100 })
101 }
102
103 pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<T>, ErrorTypes> {
105 Ok(match self.raw.pop_timeout(timeout).await? {
106 Some(value) => Some(serde_json::from_slice(&value)?),
107 None => None
108 })
109 }
110
111 pub async fn pop_batch(&self, limit: usize) -> Result<Vec<T>, ErrorTypes> {
113 let response: Vec<Vec<u8>> = self.raw.pop_batch(limit).await?;
114 let mut out = vec![];
115 for data in response {
116 out.push(serde_json::from_slice(&data)?);
117 }
118 Ok(out)
119 }
120
121 pub async fn select(queues: &[&Queue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
123 let queues: Vec<&RawQueue> = queues.iter().map(|queue|&queue.raw).collect();
124 let response = RawQueue::select(&queues, timeout).await?;
125 Ok(match response {
126 Some((name, data)) => Some((name, serde_json::from_slice(&data)?)),
127 None => None,
128 })
129 }
130
131 pub fn raw(&self) -> RawQueue {
133 self.raw.clone()
134 }
135}
136
137#[derive(Clone)]
140pub struct RawQueue {
141 name: String,
142 store: Arc<RedisObjects>,
143 ttl: Option<Duration>,
144 last_expire_time: Arc<std::sync::Mutex<Option<std::time::Instant>>>,
145}
146
147impl std::fmt::Debug for RawQueue {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("RawQueue").field("name", &self.name).field("store", &self.store).finish()
150 }
151}
152
153impl RawQueue {
154 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
155 Self {
156 name,
157 store,
158 ttl,
159 last_expire_time: Arc::new(std::sync::Mutex::new(None)),
160 }
161 }
162
163 async fn conditional_expire(&self) -> Result<(), ErrorTypes> {
165 if let Some(ttl) = self.ttl {
167 {
168 let mut last_expire_time = self.last_expire_time.lock().unwrap();
170
171 if let Some(time) = *last_expire_time {
174 if time.elapsed() < (ttl / 4) {
175 return Ok(())
176 }
177 };
178
179 *last_expire_time = Some(std::time::Instant::now());
182 }
183 let _: () = retry_call!(self.store.pool, expire, &self.name, ttl.as_secs() as i64)?;
184 }
185 Ok(())
186 }
187
188 #[instrument(skip(data))]
190 pub async fn push(&self, data: &[u8]) -> Result<(), ErrorTypes> {
191 let _: () = retry_call!(self.store.pool, rpush, &self.name, data)?;
192 self.conditional_expire().await
193 }
194
195 #[instrument(skip(data))]
197 pub async fn push_batch(&self, data: impl Iterator<Item=&[u8]>) -> Result<(), ErrorTypes> {
198 let mut pipe = redis::pipe();
199 for item in data {
200 pipe.rpush(&self.name, item);
201 }
202 let _: () = retry_call!(method, self.store.pool, pipe, query_async)?;
203 self.conditional_expire().await
204 }
205
206 #[instrument(skip(data))]
208 pub async fn unpop(&self, data: &[u8]) -> Result<(), ErrorTypes> {
209 let _: () = retry_call!(self.store.pool, lpush, &self.name, data)?;
210 self.conditional_expire().await
211 }
212
213 #[instrument]
215 pub async fn length(&self) -> Result<usize, ErrorTypes> {
216 retry_call!(self.store.pool, llen, &self.name)
217 }
218
219 #[instrument]
221 pub async fn peek_next(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
222 let response: Vec<Vec<u8>> = retry_call!(self.store.pool, lrange, &self.name, 0, 0)?;
223 Ok(response.into_iter().nth(0))
224 }
225
226 #[instrument]
228 pub async fn content(&self) -> Result<Vec<Vec<u8>>, ErrorTypes> {
229 Ok(retry_call!(self.store.pool, lrange, &self.name, 0, -1)?)
230 }
231
232 #[instrument]
234 pub async fn delete(&self) -> Result<(), ErrorTypes> {
235 retry_call!(self.store.pool, del, &self.name)
236 }
237
238 #[instrument]
240 pub async fn pop(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
241 Ok(retry_call!(self.store.pool, lpop, &self.name, None)?)
242 }
243
244 #[instrument]
246 pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<Vec<u8>>, ErrorTypes> {
247 let response: Option<(String, Vec<u8>)> = retry_call!(self.store.pool, blpop, &self.name, timeout.as_secs_f64())?;
248 Ok(response.map(|(_, data)| data))
249 }
250
251 #[instrument]
253 pub async fn pop_batch(&self, limit: usize) -> Result<Vec<Vec<u8>>, ErrorTypes> {
254 let limit = match NonZeroUsize::new(limit) {
255 Some(value) => value,
256 None => return Ok(Default::default()),
257 };
258 Ok(retry_call!(self.store.pool, lpop, &self.name, Some(limit))?)
259 }
260
261 #[instrument]
263 pub async fn select(queues: &[&RawQueue], timeout: Option<Duration>) -> Result<Option<(String, Vec<u8>)>, ErrorTypes> {
264 let timeout = timeout.unwrap_or_default().as_secs_f64();
265 if queues.is_empty() {
266 return Ok(None)
267 }
268
269 let store = &queues[0].store;
270 let mut names = vec![];
271 for queue in queues {
272 names.push(queue.name.as_str())
273 }
274 Ok(retry_call!(store.pool, blpop, &names, timeout)?)
275 }
276}
277
278const PQ_DEQUEUE_RANGE_SCRIPT: &str = r#"
287local unpack = table.unpack or unpack
288local min_score = tonumber(ARGV[1]);
289if min_score == nil then min_score = -math.huge end
290local max_score = tonumber(ARGV[2]);
291if max_score == nil then max_score = math.huge end
292local rem_offset = tonumber(ARGV[3]);
293local rem_limit = tonumber(ARGV[4]);
294
295local entries = redis.call("zrangebyscore", KEYS[1], min_score, max_score, "limit", rem_offset, rem_limit);
296if #entries > 0 then redis.call("zrem", KEYS[1], unpack(entries)) end
297return entries
298"#;
299
300const SORTING_KEY_LEN: usize = 21;
302
303pub struct PriorityQueue<T> {
305 name: String,
306 store: Arc<RedisObjects>,
307 dequeue_range: redis::Script,
308 _data: PhantomData<T>,
309}
310
311impl<T> std::fmt::Debug for PriorityQueue<T> {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 f.debug_struct("PriorityQueue").field("name", &self.name).field("store", &self.store).finish()
314 }
315}
316
317impl<T: Serialize + DeserializeOwned> PriorityQueue<T> {
318 pub (crate) fn new(name: String, store: Arc<RedisObjects>) -> Self {
319 Self {
320 name,
321 store,
322 dequeue_range: redis::Script::new(PQ_DEQUEUE_RANGE_SCRIPT),
323 _data: PhantomData,
324 }
325 }
326
327 pub fn name(&self) -> &str {
329 self.name.as_str()
330 }
331
332 fn encode(item: &T) -> Result<Vec<u8>, ErrorTypes> {
333 let vip = false;
334 let vip = if vip { 0 } else { 9 };
335
336 let now = chrono::Utc::now().timestamp_micros();
337 let data = serde_json::to_string(&item)?;
338
339 Ok(format!("{vip}{now:020}{data}").into_bytes())
341 }
342 fn decode(data: &[u8]) -> Result<T, ErrorTypes> {
343 Ok(serde_json::from_slice(&data[SORTING_KEY_LEN..])?)
344 }
345
346 #[instrument]
348 pub async fn count(&self, lowest: f64, highest: f64) -> Result<u64, ErrorTypes> {
349 Ok(retry_call!(self.store.pool, zcount, &self.name, -highest, -lowest)?)
350 }
351
352 #[instrument]
354 pub async fn delete(&self) -> Result<(), ErrorTypes> {
355 retry_call!(self.store.pool, del, &self.name)
356 }
357
358 #[instrument]
360 pub async fn length(&self) -> Result<u64, ErrorTypes> {
361 retry_call!(self.store.pool, zcard, &self.name)
362 }
363
364 #[instrument]
366 pub async fn pop(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
367 if num <= 0 {
368 return Ok(Default::default())
369 };
370 let items: Vec<(Vec<u8>, f64)> = retry_call!(self.store.pool, zpopmin, &self.name, num)?;
371 let mut out = vec![];
372 for (data, _priority) in items {
373 out.push(Self::decode(&data)?);
374 }
375 Ok(out)
376 }
377
378 #[instrument]
380 pub async fn blocking_pop(&self, timeout: Duration, low_priority: bool) -> Result<Option<T>, ErrorTypes> {
381 let result: Option<(String, Vec<u8>, f64)> = if low_priority {
382 retry_call!(self.store.pool, bzpopmax, &self.name, timeout.as_secs_f64())?
383 } else {
384 retry_call!(self.store.pool, bzpopmin, &self.name, timeout.as_secs_f64())?
385 };
386 match result {
387 Some(result) => Ok(Some(Self::decode(&result.1)?)),
388 None => Ok(None)
389 }
390 }
391
392#[instrument]
410 pub async fn dequeue_range(&self, lower_limit: Option<i64>, upper_limit: Option<i64>, skip: Option<u32>, num: Option<u32>) -> Result<Vec<T>, ErrorTypes> {
411 let skip = skip.unwrap_or(0);
412 let num = num.unwrap_or(1);
413 let mut call = self.dequeue_range.key(&self.name);
414
415 let inner_lower = match upper_limit {
416 Some(value) => -value,
417 None => i64::MIN,
418 };
419 let inner_upper = match lower_limit {
420 Some(value) => -value,
421 None => i64::MAX,
422 };
423
424 let call = call.arg(inner_lower).arg(inner_upper).arg(skip).arg(num);
425 let results: Vec<Vec<u8>> = retry_call!(method, self.store.pool, call, invoke_async)?;
426 results.iter()
427 .map(|row| Self::decode(row))
428 .collect()
429 }
432
433 #[instrument(skip(data))]
435 pub async fn push(&self, priority: f64, data: &T) -> Result<Vec<u8>, ErrorTypes> {
436 let value = Self::encode(data)?;
437 if retry_call!(self.store.pool, zadd, &self.name, &value, -priority)? {
438 Ok(value)
439 } else {
440 Err(ErrorTypes::UnknownRedisError)
441 }
442 }
443
444 #[instrument(skip(raw_value))]
446 pub async fn rank(&self, raw_value: &[u8]) -> Result<Option<u64>, ErrorTypes> {
447 retry_call!(self.store.pool, zrank, &self.name, raw_value)
448 }
449
450 #[instrument(skip(raw_value))]
452 pub async fn remove(&self, raw_value: &[u8]) -> Result<bool, ErrorTypes> {
453 let count: i32 = retry_call!(self.store.pool, zrem, &self.name, raw_value)?;
454 Ok(count >= 1)
455 }
456
457 #[instrument]
459 pub async fn unpush(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
460 if num <= 0 {
461 return Ok(Default::default())
462 };
463 let items: Vec<(Vec<u8>, i32)> = retry_call!(self.store.pool, zpopmax, &self.name, num)?;
464 let mut out = vec![];
465 for (data, _priority) in items {
466 out.push(Self::decode(&data)?);
467 }
468 Ok(out)
469 }
470
471 #[instrument]
473 pub async fn select(queues: &[&PriorityQueue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
474 if queues.is_empty() {
475 return Ok(Default::default())
476 }
477
478 let _timeout = timeout.unwrap_or_default().as_secs_f64();
479 let mut names = vec![];
481 for queue in queues {
482 names.push(queue.name.as_str());
483 }
484 let response: Option<(String, Vec<u8>, f64)> = retry_call!(queues[0].store.pool, bzpopmin, &names, _timeout)?;
485
486 Ok(match response {
487 Some((queue, value, _)) => Some((queue, Self::decode(&value)?)),
488 None => None,
489 })
490 }
491
492 #[instrument]
494 pub async fn all_length(queues: &[&PriorityQueue<T>]) -> Result<Vec<u64>, ErrorTypes> {
495 if queues.is_empty() {
496 return Ok(Default::default())
497 }
498
499 let mut pipe = redis::pipe();
500 for que in queues {
501 pipe.zcard(&que.name);
502 }
503
504 Ok(retry_call!(method, queues[0].store.pool, pipe, query_async)?)
505 }
506
507
508}
509
510pub struct MultiQueue<Message: Serialize + DeserializeOwned> {
512 store: Arc<RedisObjects>,
513 prefix: String,
514 _data: PhantomData<Message>,
515}
516
517impl<Message: Serialize + DeserializeOwned> std::fmt::Debug for MultiQueue<Message> {
518 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519 f.debug_struct("MultiQueue").field("store", &self.store).field("prefix", &self.prefix).finish()
520 }
521}
522
523impl<Message: Serialize + DeserializeOwned> MultiQueue<Message> {
524 pub(crate) fn new(prefix: String, store: Arc<RedisObjects>) -> Self {
525 Self {store, prefix, _data: Default::default()}
526 }
527
528 #[instrument]
530 pub async fn delete(&self, name: &str) -> Result<(), ErrorTypes> {
531 retry_call!(self.store.pool, del, self.prefix.clone() + name)
532 }
533
534 #[instrument]
536 pub async fn length(&self, name: &str) -> Result<u64, ErrorTypes> {
537 retry_call!(self.store.pool, llen, self.prefix.clone() + name)
538 }
539
540 #[instrument]
542 pub async fn pop_nonblocking(&self, name: &str) -> Result<Option<Message>, ErrorTypes> {
543 let result: Option<String> = retry_call!(self.store.pool, lpop, self.prefix.clone() + name, None)?;
544 match result {
545 Some(result) => Ok(serde_json::from_str(&result)?),
546 None => Ok(None)
547 }
548 }
549
550 #[instrument]
552 pub async fn pop(&self, name: &str, timeout: Duration) -> Result<Option<Message>, ErrorTypes> {
553 let result: Option<(String, String)> = retry_call!(self.store.pool, blpop, self.prefix.clone() + name, timeout.as_secs_f64())?;
554 match result {
555 Some((_, result)) => Ok(serde_json::from_str(&result)?),
556 None => Ok(None),
557 }
558 }
559
560 #[instrument(skip(message))]
562 pub async fn push(&self, name: &str, message: &Message) -> Result<(), ErrorTypes> {
563 let _: () = retry_call!(self.store.pool, rpush, self.prefix.clone() + name, serde_json::to_string(message)?)?;
564 Ok(())
565 }
566}