1use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::AsyncCommands;
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11
12use crate::ErrorTypes;
13
14use super::{RedisObjects, retry_call};
15
16pub struct Queue<T: Serialize + DeserializeOwned> {
19 raw: RawQueue,
20 _data: PhantomData<T>
21}
22
23impl<T: Serialize + DeserializeOwned> Clone for Queue<T> {
24 fn clone(&self) -> Self {
25 Self { raw: self.raw.clone(), _data: self._data }
26 }
27}
28
29impl<T: Serialize + DeserializeOwned> Queue<T> {
30 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
31 Self {
32 raw: RawQueue::new(name, store, ttl),
33 _data: PhantomData,
34 }
35 }
36
37 pub fn host(&self) -> Arc<RedisObjects> {
39 self.raw.store.clone()
40 }
41
42 pub async fn push(&self, data: &T) -> Result<(), ErrorTypes> {
44 self.raw.push(&serde_json::to_vec(data)?).await
45 }
46
47 pub async fn push_batch(&self, data: &[T]) -> Result<(), ErrorTypes> {
49 let data: Result<Vec<Vec<u8>>, _> = data.iter()
50 .map(|item | serde_json::to_vec(item))
51 .collect();
52 self.raw.push_batch(data?.iter().map(|item| item.as_slice())).await
53 }
54
55 pub async fn unpop(&self, data: &T) -> Result<(), ErrorTypes> {
57 self.raw.unpop(&serde_json::to_vec(data)?).await
58 }
59
60 pub async fn length(&self) -> Result<usize, ErrorTypes> {
62 self.raw.length().await
63 }
64
65 pub async fn peek_next(&self) -> Result<Option<T>, ErrorTypes> {
67 Ok(match self.raw.peek_next().await? {
68 Some(value) => Some(serde_json::from_slice(&value)?),
69 None => None
70 })
71 }
72
73 pub async fn content(&self) -> Result<Vec<T>, ErrorTypes> {
75 let response: Vec<Vec<u8>> = self.raw.content().await?;
76 let mut out = vec![];
77 for data in response {
78 out.push(serde_json::from_slice(&data)?);
79 }
80 Ok(out)
81 }
82
83 pub async fn delete(&self) -> Result<(), ErrorTypes> {
85 self.raw.delete().await
86 }
87
88 pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
90 Ok(match self.raw.pop().await? {
91 Some(value) => Some(serde_json::from_slice(&value)?),
92 None => None
93 })
94 }
95
96 pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<T>, ErrorTypes> {
98 Ok(match self.raw.pop_timeout(timeout).await? {
99 Some(value) => Some(serde_json::from_slice(&value)?),
100 None => None
101 })
102 }
103
104 pub async fn pop_batch(&self, limit: usize) -> Result<Vec<T>, ErrorTypes> {
106 let response: Vec<Vec<u8>> = self.raw.pop_batch(limit).await?;
107 let mut out = vec![];
108 for data in response {
109 out.push(serde_json::from_slice(&data)?);
110 }
111 Ok(out)
112 }
113
114 pub async fn select(queues: &[&Queue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
116 let queues: Vec<&RawQueue> = queues.iter().map(|queue|&queue.raw).collect();
117 let response = RawQueue::select(&queues, timeout).await?;
118 Ok(match response {
119 Some((name, data)) => Some((name, serde_json::from_slice(&data)?)),
120 None => None,
121 })
122 }
123
124 pub fn raw(&self) -> RawQueue {
126 self.raw.clone()
127 }
128}
129
130#[derive(Clone)]
133pub struct RawQueue {
134 name: String,
135 store: Arc<RedisObjects>,
136 ttl: Option<Duration>,
137 last_expire_time: Arc<std::sync::Mutex<Option<std::time::Instant>>>,
138}
139
140impl RawQueue {
141 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
142 Self {
143 name,
144 store,
145 ttl,
146 last_expire_time: Arc::new(std::sync::Mutex::new(None)),
147 }
148 }
149
150 async fn conditional_expire(&self) -> Result<(), ErrorTypes> {
152 if let Some(ttl) = self.ttl {
154 {
155 let mut last_expire_time = self.last_expire_time.lock().unwrap();
157
158 if let Some(time) = *last_expire_time {
161 if time.elapsed() < (ttl / 4) {
162 return Ok(())
163 }
164 };
165
166 *last_expire_time = Some(std::time::Instant::now());
169 }
170 let _: () = retry_call!(self.store.pool, expire, &self.name, ttl.as_secs() as i64)?;
171 }
172 Ok(())
173 }
174
175 pub async fn push(&self, data: &[u8]) -> Result<(), ErrorTypes> {
177 let _: () = retry_call!(self.store.pool, rpush, &self.name, data)?;
178 self.conditional_expire().await
179 }
180
181 pub async fn push_batch(&self, data: impl Iterator<Item=&[u8]>) -> Result<(), ErrorTypes> {
183 let mut pipe = redis::pipe();
184 for item in data {
185 pipe.rpush(&self.name, item);
186 }
187 let _: () = retry_call!(method, self.store.pool, pipe, query_async)?;
188 self.conditional_expire().await
189 }
190
191 pub async fn unpop(&self, data: &[u8]) -> Result<(), ErrorTypes> {
193 let _: () = retry_call!(self.store.pool, lpush, &self.name, data)?;
194 self.conditional_expire().await
195 }
196
197 pub async fn length(&self) -> Result<usize, ErrorTypes> {
199 retry_call!(self.store.pool, llen, &self.name)
200 }
201
202 pub async fn peek_next(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
204 let response: Vec<Vec<u8>> = retry_call!(self.store.pool, lrange, &self.name, 0, 0)?;
205 Ok(response.into_iter().nth(0))
206 }
207
208 pub async fn content(&self) -> Result<Vec<Vec<u8>>, ErrorTypes> {
210 Ok(retry_call!(self.store.pool, lrange, &self.name, 0, -1)?)
211 }
212
213 pub async fn delete(&self) -> Result<(), ErrorTypes> {
215 retry_call!(self.store.pool, del, &self.name)
216 }
217
218 pub async fn pop(&self) -> Result<Option<Vec<u8>>, ErrorTypes> {
220 Ok(retry_call!(self.store.pool, lpop, &self.name, None)?)
221 }
222
223 pub async fn pop_timeout(&self, timeout: Duration) -> Result<Option<Vec<u8>>, ErrorTypes> {
225 let response: Option<(String, Vec<u8>)> = retry_call!(self.store.pool, blpop, &self.name, timeout.as_secs_f64())?;
226 Ok(response.map(|(_, data)| data))
227 }
228
229 pub async fn pop_batch(&self, limit: usize) -> Result<Vec<Vec<u8>>, ErrorTypes> {
231 let limit = match NonZeroUsize::new(limit) {
232 Some(value) => value,
233 None => return Ok(Default::default()),
234 };
235 Ok(retry_call!(self.store.pool, lpop, &self.name, Some(limit))?)
236 }
237
238 pub async fn select(queues: &[&RawQueue], timeout: Option<Duration>) -> Result<Option<(String, Vec<u8>)>, ErrorTypes> {
240 let timeout = timeout.unwrap_or_default().as_secs_f64();
241 if queues.is_empty() {
242 return Ok(None)
243 }
244
245 let store = &queues[0].store;
246 let mut names = vec![];
247 for queue in queues {
248 names.push(queue.name.as_str())
249 }
250 Ok(retry_call!(store.pool, blpop, &names, timeout)?)
251 }
252}
253
254const PQ_DEQUEUE_RANGE_SCRIPT: &str = r#"
263local unpack = table.unpack or unpack
264local min_score = tonumber(ARGV[1]);
265if min_score == nil then min_score = -math.huge end
266local max_score = tonumber(ARGV[2]);
267if max_score == nil then max_score = math.huge end
268local rem_offset = tonumber(ARGV[3]);
269local rem_limit = tonumber(ARGV[4]);
270
271local entries = redis.call("zrangebyscore", KEYS[1], min_score, max_score, "limit", rem_offset, rem_limit);
272if #entries > 0 then redis.call("zrem", KEYS[1], unpack(entries)) end
273return entries
274"#;
275
276const SORTING_KEY_LEN: usize = 21;
278
279pub struct PriorityQueue<T> {
281 name: String,
282 store: Arc<RedisObjects>,
283 dequeue_range: redis::Script,
284 _data: PhantomData<T>,
285}
286
287impl<T: Serialize + DeserializeOwned> PriorityQueue<T> {
288 pub (crate) fn new(name: String, store: Arc<RedisObjects>) -> Self {
289 Self {
290 name,
291 store,
292 dequeue_range: redis::Script::new(PQ_DEQUEUE_RANGE_SCRIPT),
293 _data: PhantomData,
294 }
295 }
296
297 pub fn name(&self) -> &str {
299 self.name.as_str()
300 }
301
302 fn encode(item: &T) -> Result<Vec<u8>, ErrorTypes> {
303 let vip = false;
304 let vip = if vip { 0 } else { 9 };
305
306 let now = chrono::Utc::now().timestamp_micros();
307 let data = serde_json::to_string(&item)?;
308
309 Ok(format!("{vip}{now:020}{data}").into_bytes())
311 }
312 fn decode(data: &[u8]) -> Result<T, ErrorTypes> {
313 Ok(serde_json::from_slice(&data[SORTING_KEY_LEN..])?)
314 }
315
316 pub async fn count(&self, lowest: f64, highest: f64) -> Result<u64, ErrorTypes> {
318 Ok(retry_call!(self.store.pool, zcount, &self.name, -highest, -lowest)?)
319 }
320
321 pub async fn delete(&self) -> Result<(), ErrorTypes> {
323 retry_call!(self.store.pool, del, &self.name)
324 }
325
326 pub async fn length(&self) -> Result<u64, ErrorTypes> {
328 retry_call!(self.store.pool, zcard, &self.name)
329 }
330
331 pub async fn pop(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
333 if num <= 0 {
334 return Ok(Default::default())
335 };
336 let items: Vec<(Vec<u8>, f64)> = retry_call!(self.store.pool, zpopmin, &self.name, num)?;
337 let mut out = vec![];
338 for (data, _priority) in items {
339 out.push(Self::decode(&data)?);
340 }
341 Ok(out)
342 }
343
344 pub async fn blocking_pop(&self, timeout: Duration, low_priority: bool) -> Result<Option<T>, ErrorTypes> {
346 let result: Option<(String, Vec<u8>, f64)> = if low_priority {
347 retry_call!(self.store.pool, bzpopmax, &self.name, timeout.as_secs_f64())?
348 } else {
349 retry_call!(self.store.pool, bzpopmin, &self.name, timeout.as_secs_f64())?
350 };
351 match result {
352 Some(result) => Ok(Some(Self::decode(&result.1)?)),
353 None => Ok(None)
354 }
355 }
356
357pub async fn dequeue_range(&self, lower_limit: Option<i64>, upper_limit: Option<i64>, skip: Option<u32>, num: Option<u32>) -> Result<Vec<T>, ErrorTypes> {
375 let skip = skip.unwrap_or(0);
376 let num = num.unwrap_or(1);
377 let mut call = self.dequeue_range.key(&self.name);
378
379 let inner_lower = match upper_limit {
380 Some(value) => -value,
381 None => i64::MIN,
382 };
383 let inner_upper = match lower_limit {
384 Some(value) => -value,
385 None => i64::MAX,
386 };
387
388 let call = call.arg(inner_lower).arg(inner_upper).arg(skip).arg(num);
389 let results: Vec<Vec<u8>> = retry_call!(method, self.store.pool, call, invoke_async)?;
390 results.iter()
391 .map(|row| Self::decode(row))
392 .collect()
393 }
396
397 pub async fn push(&self, priority: f64, data: &T) -> Result<Vec<u8>, ErrorTypes> {
399 let value = Self::encode(data)?;
400 if retry_call!(self.store.pool, zadd, &self.name, &value, -priority)? {
401 Ok(value)
402 } else {
403 Err(ErrorTypes::UnknownRedisError)
404 }
405 }
406
407 pub async fn rank(&self, raw_value: &[u8]) -> Result<Option<u64>, ErrorTypes> {
409 retry_call!(self.store.pool, zrank, &self.name, raw_value)
410 }
411
412 pub async fn remove(&self, raw_value: &[u8]) -> Result<bool, ErrorTypes> {
414 let count: i32 = retry_call!(self.store.pool, zrem, &self.name, raw_value)?;
415 Ok(count >= 1)
416 }
417
418 pub async fn unpush(&self, num: isize) -> Result<Vec<T>, ErrorTypes> {
420 if num <= 0 {
421 return Ok(Default::default())
422 };
423 let items: Vec<(Vec<u8>, i32)> = retry_call!(self.store.pool, zpopmax, &self.name, num)?;
424 let mut out = vec![];
425 for (data, _priority) in items {
426 out.push(Self::decode(&data)?);
427 }
428 Ok(out)
429 }
430
431 pub async fn select(queues: &[&PriorityQueue<T>], timeout: Option<Duration>) -> Result<Option<(String, T)>, ErrorTypes> {
433 if queues.is_empty() {
434 return Ok(Default::default())
435 }
436
437 let _timeout = timeout.unwrap_or_default().as_secs_f64();
438 let mut names = vec![];
440 for queue in queues {
441 names.push(queue.name.as_str());
442 }
443 let response: Option<(String, Vec<u8>, f64)> = retry_call!(queues[0].store.pool, bzpopmin, &names, _timeout)?;
444
445 Ok(match response {
446 Some((queue, value, _)) => Some((queue, Self::decode(&value)?)),
447 None => None,
448 })
449 }
450
451 pub async fn all_length(queues: &[&PriorityQueue<T>]) -> Result<Vec<u64>, ErrorTypes> {
453 if queues.is_empty() {
454 return Ok(Default::default())
455 }
456
457 let mut pipe = redis::pipe();
458 for que in queues {
459 pipe.zcard(&que.name);
460 }
461
462 Ok(retry_call!(method, queues[0].store.pool, pipe, query_async)?)
463 }
464
465
466}
467
468pub struct MultiQueue<Message: Serialize + DeserializeOwned> {
470 store: Arc<RedisObjects>,
471 prefix: String,
472 _data: PhantomData<Message>,
473}
474
475impl<Message: Serialize + DeserializeOwned> MultiQueue<Message> {
476 pub(crate) fn new(prefix: String, store: Arc<RedisObjects>) -> Self {
477 Self {store, prefix, _data: Default::default()}
478 }
479
480 pub async fn delete(&self, name: &str) -> Result<(), ErrorTypes> {
482 retry_call!(self.store.pool, del, self.prefix.clone() + name)
483 }
484
485 pub async fn length(&self, name: &str) -> Result<u64, ErrorTypes> {
487 retry_call!(self.store.pool, llen, self.prefix.clone() + name)
488 }
489
490 pub async fn pop_nonblocking(&self, name: &str) -> Result<Option<Message>, ErrorTypes> {
492 let result: Option<String> = retry_call!(self.store.pool, lpop, self.prefix.clone() + name, None)?;
493 match result {
494 Some(result) => Ok(serde_json::from_str(&result)?),
495 None => Ok(None)
496 }
497 }
498
499 pub async fn pop(&self, name: &str, timeout: Duration) -> Result<Option<Message>, ErrorTypes> {
501 let result: Option<(String, String)> = retry_call!(self.store.pool, blpop, self.prefix.clone() + name, timeout.as_secs_f64())?;
502 match result {
503 Some((_, result)) => Ok(serde_json::from_str(&result)?),
504 None => Ok(None),
505 }
506 }
507
508 pub async fn push(&self, name: &str, message: &Message) -> Result<(), ErrorTypes> {
510 let _: () = retry_call!(self.store.pool, rpush, self.prefix.clone() + name, serde_json::to_string(message)?)?;
511 Ok(())
512 }
513}