1use super::{expire::Expiration, item::CatalogItem};
2use chrono::Utc;
3use core::f64;
4use redis::{ConnectionLike, RedisResult};
5use serde::{de::DeserializeOwned, Serialize};
6use std::{fmt::Debug, marker::PhantomData, num::NonZero};
7use uuid::Uuid;
8
9#[derive(Debug, Clone)]
10pub struct Catalog<I>
11where
12 I: Debug + Serialize + DeserializeOwned,
13{
14 root_namespace: String,
15 name: String,
16 catalog_key: String,
17 item_expirations_key: String,
18 checkout_expirations_key: String,
19 default_item_expiration: Expiration,
20 default_checkout_expiration: Expiration,
21 _item_type: PhantomData<CatalogItem<I>>,
22}
23
24impl<I> Catalog<I>
25where
26 I: Debug + Serialize + DeserializeOwned,
27{
28 pub fn new(
31 root_namespace: String,
32 name: String,
33 default_item_expiration: Expiration,
34 default_checkout_expiration: Expiration,
35 ) -> Self {
36 let catalog_ns = format!("{}:{}", root_namespace, name);
37 let catalog_key = format!("{}:catalog", catalog_ns);
38 let item_expirations_key = format!("{}:item-expirations", catalog_ns);
39 let checkout_expirations_key = format!("{}:checkout-expirations", catalog_ns);
40
41 Self {
42 root_namespace,
43 name,
44 catalog_key,
45 item_expirations_key,
46 checkout_expirations_key,
47 default_item_expiration,
48 default_checkout_expiration,
49 _item_type: PhantomData::<CatalogItem<I>>,
50 }
51 }
52
53 pub fn root_namespace(&self) -> &str {
55 self.root_namespace.as_str()
56 }
57
58 pub fn name(&self) -> &str {
60 self.name.as_str()
61 }
62
63 pub fn catalog_key(&self) -> &str {
65 self.catalog_key.as_str()
66 }
67
68 pub fn catalog_expirations_key(&self) -> &str {
70 self.item_expirations_key.as_str()
71 }
72
73 pub fn checkouts_expirations_key(&self) -> &str {
75 self.checkout_expirations_key.as_str()
76 }
77
78 pub fn default_item_expiration(&self) -> Expiration {
80 self.default_item_expiration
81 }
82
83 pub fn default_checkout_expiration(&self) -> Expiration {
85 self.default_checkout_expiration
86 }
87
88 pub fn destroy_catalog<C>(self, con: &mut C) -> RedisResult<i64>
90 where
91 C: ConnectionLike,
92 {
93 let keys = &[
94 self.catalog_key,
95 self.item_expirations_key,
96 self.checkout_expirations_key,
97 ];
98 redis::transaction(con, keys, |trc, pipe| pipe.del(keys).query(trc)).map(|(n,)| n)
99 }
100
101 fn register_with_expiration_f64_timestamp<C>(
102 &self,
103 con: &mut C,
104 item: CatalogItem<I>,
105 expires_on: f64,
106 ) -> RedisResult<(i64, i64)>
107 where
108 C: ConnectionLike,
109 {
110 let keys = &[
111 &self.catalog_key,
112 &self.item_expirations_key,
113 &self.checkout_expirations_key,
114 ];
115 let item_id = item.id.to_string();
116 redis::transaction(con, keys, move |trc, pipe| {
117 pipe.zadd(&self.item_expirations_key, &item_id, expires_on)
118 .hset(&self.catalog_key, &item_id, &item)
119 .query(trc)
120 })
121 }
122
123 pub fn register<C>(&self, con: &mut C, item: CatalogItem<I>) -> RedisResult<(i64, i64)>
125 where
126 C: ConnectionLike,
127 {
128 let expires_on = item
129 .expires_on
130 .unwrap_or_else(|| self.default_item_expiration.as_f64_timestamp());
131 self.register_with_expiration_f64_timestamp(con, item, expires_on)
132 }
133
134 pub fn register_with_expiration<C>(
136 &self,
137 con: &mut C,
138 item: CatalogItem<I>,
139 expiration: Expiration,
140 ) -> RedisResult<(i64, i64)>
141 where
142 C: ConnectionLike,
143 {
144 let expires_on = expiration.as_f64_timestamp();
145 self.register_with_expiration_f64_timestamp(con, item, expires_on)
146 }
147
148 fn register_multiple_with_f64_timestamp_expirations<C>(
149 &self,
150 con: &mut C,
151 items: &[CatalogItem<I>],
152 expirations: &[f64],
153 ) -> RedisResult<(i64, bool)>
154 where
155 C: ConnectionLike,
156 {
157 debug_assert_eq!(expirations.len(), items.len());
158
159 let keys = &[
160 &self.catalog_key,
161 &self.item_expirations_key,
162 &self.checkout_expirations_key,
163 ];
164
165 let scores_members: Vec<(&f64, String)> = expirations
166 .iter()
167 .zip(items.iter().map(|item| item.id.to_string()))
168 .collect();
169
170 let item_kvs: Vec<(String, &CatalogItem<I>)> = items
171 .iter()
172 .map(|item| (item.id.to_string(), item))
173 .collect();
174
175 redis::transaction(con, keys, move |trc, pipe| {
176 let result: RedisResult<(i64, String)> = pipe
177 .zadd_multiple(&self.item_expirations_key, &scores_members)
178 .hset_multiple(&self.catalog_key, &item_kvs)
179 .query(trc);
180
181 result.map(|(z, h)| Some((z, h == "OK")))
182 })
183 }
184
185 pub fn register_multiple<C>(
187 &self,
188 con: &mut C,
189 items: &[CatalogItem<I>],
190 ) -> RedisResult<(i64, bool)>
191 where
192 C: ConnectionLike,
193 {
194 let default_expiration = self.default_item_expiration.as_f64_timestamp();
195 let expirations: Vec<f64> = items
196 .iter()
197 .map(|item| item.expires_on.unwrap_or(default_expiration))
198 .collect();
199
200 self.register_multiple_with_f64_timestamp_expirations(con, items, &expirations)
201 }
202
203 pub fn register_multiple_with_expiration<C>(
205 &self,
206 con: &mut C,
207 items: &[CatalogItem<I>],
208 expiration: Expiration,
209 ) -> RedisResult<(i64, bool)>
210 where
211 C: ConnectionLike,
212 {
213 let expiration = expiration.as_f64_timestamp();
214 let expirations = vec![expiration; items.len()];
215 self.register_multiple_with_f64_timestamp_expirations(con, items, &expirations)
216 }
217
218 fn checkout_with_f64_timestamp_timeout<C>(
219 &self,
220 con: &mut C,
221 timeout_on: f64,
222 ) -> RedisResult<Option<CatalogItem<I>>>
223 where
224 C: ConnectionLike,
225 {
226 let keys = &[
227 &self.catalog_key,
228 &self.item_expirations_key,
229 &self.checkout_expirations_key,
230 ];
231
232 redis::transaction(con, keys, |trc, pipe| {
233 let (items_scores,): (Vec<(String, f64)>,) =
234 pipe.zpopmin(&self.item_expirations_key, 1).query(trc)?;
235
236 let result = if let Some((item_id, _item_expiration)) = items_scores.first() {
237 pipe.clear();
238 let (queried_item,): (Option<CatalogItem<I>>,) =
239 pipe.hget(&self.catalog_key, item_id).query(trc)?;
240
241 if queried_item.is_some() {
242 pipe.clear();
243 let _: (i64,) = pipe
244 .zadd(&self.checkout_expirations_key, item_id, timeout_on)
245 .query(trc)?;
246 }
247
248 queried_item
249 } else {
250 None
251 };
252
253 RedisResult::Ok(Some(result))
254 })
255 }
256
257 pub fn checkout<C>(&self, con: &mut C) -> RedisResult<Option<CatalogItem<I>>>
259 where
260 C: ConnectionLike,
261 {
262 let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
263 self.checkout_with_f64_timestamp_timeout(con, timeout_on)
264 }
265
266 pub fn checkout_with_timeout<C>(
268 &self,
269 con: &mut C,
270 timeout: Expiration,
271 ) -> RedisResult<Option<CatalogItem<I>>>
272 where
273 C: ConnectionLike,
274 {
275 let timeout_on = timeout.as_f64_timestamp();
276 self.checkout_with_f64_timestamp_timeout(con, timeout_on)
277 }
278
279 fn checkout_multiple_with_f64_timestamp_timeout<C>(
280 &self,
281 con: &mut C,
282 count: NonZero<usize>,
283 timeout_on: f64,
284 ) -> RedisResult<Vec<CatalogItem<I>>>
285 where
286 C: ConnectionLike,
287 {
288 let keys = &[
289 &self.catalog_key,
290 &self.item_expirations_key,
291 &self.checkout_expirations_key,
292 ];
293
294 redis::transaction(con, keys, |trc, pipe| {
295 let (item_expirations,): (Vec<(String, f64)>,) = pipe
296 .zpopmin(&self.item_expirations_key, count.get() as isize)
297 .query(trc)?;
298 let item_ids: Vec<String> = item_expirations.into_iter().map(|(id, _)| id).collect();
299 pipe.clear();
300 let (queried_items,): (Vec<Option<CatalogItem<I>>>,) =
301 pipe.hmget(&self.catalog_key, &item_ids).query(trc)?;
302 let found_items: Vec<CatalogItem<I>> = queried_items.into_iter().flatten().collect();
303
304 if !found_items.is_empty() {
305 pipe.clear();
306 let scores_ids: Vec<(f64, String)> = found_items
307 .iter()
308 .map(|item| (timeout_on, item.id.to_string()))
309 .collect();
310
311 let _: (String,) = pipe
312 .zadd_multiple(&self.checkout_expirations_key, &scores_ids)
313 .query(trc)?;
314 }
315
316 RedisResult::Ok(Some(found_items))
317 })
318 }
319
320 pub fn checkout_multiple<C>(
322 &self,
323 con: &mut C,
324 count: NonZero<usize>,
325 ) -> RedisResult<Vec<CatalogItem<I>>>
326 where
327 C: ConnectionLike,
328 {
329 let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
330 self.checkout_multiple_with_f64_timestamp_timeout(con, count, timeout_on)
331 }
332
333 pub fn checkout_multiple_with_timeout<C>(
335 &self,
336 con: &mut C,
337 count: NonZero<usize>,
338 timeout: Expiration,
339 ) -> RedisResult<Vec<CatalogItem<I>>>
340 where
341 C: ConnectionLike,
342 {
343 let timeout_on = timeout.as_f64_timestamp();
344 self.checkout_multiple_with_f64_timestamp_timeout(con, count, timeout_on)
345 }
346
347 fn checkout_by_id_with_f64_timestamp_timeout<C>(
348 &self,
349 con: &mut C,
350 id: Uuid,
351 timeout_on: f64,
352 ) -> RedisResult<Option<CatalogItem<I>>>
353 where
354 C: ConnectionLike,
355 {
356 let keys = &[
357 &self.catalog_key,
358 &self.item_expirations_key,
359 &self.checkout_expirations_key,
360 ];
361 let item_id = id.to_string();
362
363 redis::transaction(con, keys, |trc, pipe| {
364 let (n,): (i64,) = pipe.zrem(&self.item_expirations_key, &item_id).query(trc)?;
365 if n == 0 {
366 return RedisResult::Ok(Some(None));
367 }
368 pipe.clear();
369
370 let (queried_item,): (Option<CatalogItem<I>>,) =
371 pipe.hget(&self.catalog_key, &item_id).query(trc)?;
372 if queried_item.is_some() {
373 pipe.clear();
374 let _: (i64,) = pipe
375 .zadd(&self.checkout_expirations_key, &item_id, timeout_on)
376 .query(trc)?;
377 }
378
379 RedisResult::Ok(Some(queried_item))
380 })
381 }
382
383 pub fn checkout_by_id<C>(&self, con: &mut C, id: Uuid) -> RedisResult<Option<CatalogItem<I>>>
385 where
386 C: ConnectionLike,
387 {
388 let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
389 self.checkout_by_id_with_f64_timestamp_timeout(con, id, timeout_on)
390 }
391
392 pub fn checkout_by_id_with_timeout<C>(
394 &self,
395 con: &mut C,
396 id: Uuid,
397 timeout: Expiration,
398 ) -> RedisResult<Option<CatalogItem<I>>>
399 where
400 C: ConnectionLike,
401 {
402 let timeout_on = timeout.as_f64_timestamp();
403 self.checkout_by_id_with_f64_timestamp_timeout(con, id, timeout_on)
404 }
405
406 fn checkout_multiple_by_id_with_f64_timestamp_timeout<C>(
407 &self,
408 con: &mut C,
409 ids: &[Uuid],
410 timeout_on: f64,
411 ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
412 where
413 C: ConnectionLike,
414 {
415 let keys = &[
416 &self.catalog_key,
417 &self.item_expirations_key,
418 &self.checkout_expirations_key,
419 ];
420 let item_ids: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
421
422 redis::transaction(con, keys, |trc, pipe| {
423 let (scores,): (Vec<Option<f64>>,) = pipe
424 .zscore_multiple(&self.item_expirations_key, &item_ids)
425 .query(trc)?;
426 pipe.clear();
427 let _: (i64,) = pipe
428 .zrem(&self.item_expirations_key, &item_ids)
429 .query(trc)?;
430 let found_ids: Vec<&String> = item_ids
431 .iter()
432 .zip(scores.iter())
433 .filter_map(|(id, score)| score.map(|_| id))
434 .collect();
435
436 let result = if !found_ids.is_empty() {
437 pipe.clear();
438 let (queried_items,): (Vec<Option<CatalogItem<I>>>,) =
439 pipe.hmget(&self.catalog_key, &found_ids).query(trc)?;
440 let found_items: Vec<&CatalogItem<I>> = queried_items.iter().flatten().collect();
441 if !found_items.is_empty() {
442 let scores_ids: Vec<(f64, String)> = found_items
443 .iter()
444 .map(|item| (timeout_on, item.id.to_string()))
445 .collect();
446
447 pipe.clear();
448 let _: (i64,) = pipe
449 .zadd_multiple(&self.checkout_expirations_key, &scores_ids)
450 .query(trc)?;
451 }
452
453 queried_items
454 } else {
455 Vec::new()
456 };
457
458 RedisResult::Ok(Some(result))
459 })
460 }
461
462 pub fn checkout_multiple_by_id<C>(
464 &self,
465 con: &mut C,
466 ids: &[Uuid],
467 ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
468 where
469 C: ConnectionLike,
470 {
471 let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
472 self.checkout_multiple_by_id_with_f64_timestamp_timeout(con, ids, timeout_on)
473 }
474
475 pub fn checkout_multiple_by_id_with_timeout<C>(
477 &self,
478 con: &mut C,
479 ids: &[Uuid],
480 timeout: Expiration,
481 ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
482 where
483 C: ConnectionLike,
484 {
485 let timeout_on = timeout.as_f64_timestamp();
486 self.checkout_multiple_by_id_with_f64_timestamp_timeout(con, ids, timeout_on)
487 }
488
489 pub fn expire_items<C>(&self, con: &mut C) -> RedisResult<(i64, i64)>
491 where
492 C: ConnectionLike,
493 {
494 let now = Utc::now();
495 let ts = now.timestamp() as f64;
496 let keys = &[
497 &self.catalog_key,
498 &self.item_expirations_key,
499 &self.checkout_expirations_key,
500 ];
501
502 redis::transaction(con, keys, |trc, pipe| {
503 let (item_ids,): (Vec<String>,) = pipe
504 .zrangebyscore(&self.item_expirations_key, 0, ts)
505 .query(trc)?;
506
507 let result: (i64, i64) = if !item_ids.is_empty() {
508 pipe.clear();
509 pipe.hdel(&self.catalog_key, &item_ids)
510 .zrem(&self.item_expirations_key, &item_ids)
511 .query(trc)?
512 } else {
513 (0, 0)
514 };
515
516 RedisResult::Ok(Some(result))
517 })
518 }
519
520 pub fn expire_and_get_items<C>(&self, con: &mut C) -> RedisResult<Vec<CatalogItem<I>>>
522 where
523 C: ConnectionLike,
524 {
525 let now = Utc::now();
526 let ts = now.timestamp() as f64;
527 let keys = &[
528 &self.catalog_key,
529 &self.item_expirations_key,
530 &self.checkout_expirations_key,
531 ];
532
533 redis::transaction(con, keys, |trc, pipe| {
534 let (item_ids,): (Vec<String>,) = pipe
535 .zrangebyscore(&self.item_expirations_key, f64::NEG_INFINITY, ts)
536 .query(trc)?;
537
538 let result = if !item_ids.is_empty() {
539 pipe.clear();
540 let (items,): (Vec<CatalogItem<I>>,) =
541 pipe.hmget(&self.catalog_key, &item_ids).query(trc)?;
542 pipe.clear();
543 let _: (i64, i64) = pipe
544 .hdel(&self.catalog_key, &item_ids)
545 .zrem(&self.item_expirations_key, &item_ids)
546 .query(trc)?;
547
548 items
549 } else {
550 Vec::new()
551 };
552
553 RedisResult::Ok(Some(result))
554 })
555 }
556
557 pub fn timeout_checkouts<C>(&self, con: &mut C) -> RedisResult<(i64, i64)>
562 where
563 C: ConnectionLike,
564 {
565 let now = Utc::now();
566 let ts = now.timestamp() as f64;
567 let keys = &[
568 &self.catalog_key,
569 &self.item_expirations_key,
570 &self.checkout_expirations_key,
571 ];
572
573 redis::transaction(con, keys, |trc, pipe| {
574 let (checked_out_item_ids,): (Vec<String>,) = pipe
575 .zrangebyscore(&self.checkout_expirations_key, f64::NEG_INFINITY, ts)
576 .query(trc)?;
577
578 let result = if !checked_out_item_ids.is_empty() {
579 pipe.clear();
580 let (items,): (Vec<Option<CatalogItem<I>>>,) = pipe
581 .hmget(&self.catalog_key, &checked_out_item_ids)
582 .query(trc)?;
583 let items: Vec<(&String, &CatalogItem<I>)> = checked_out_item_ids
584 .iter()
585 .zip(items.iter())
586 .filter_map(|(score, item)| item.as_ref().map(|item| (score, item)))
587 .collect();
588
589 let expirations: Vec<(f64, &String)> = items
590 .iter()
591 .map(|(item_id, item)| {
592 let expires_on = item
593 .expires_on
594 .unwrap_or(self.default_item_expiration.as_f64_timestamp());
595 (expires_on, *item_id)
596 })
597 .collect();
598
599 pipe.clear();
600 pipe.zadd_multiple(&self.item_expirations_key, &expirations)
601 .zrem(&self.checkout_expirations_key, &checked_out_item_ids)
602 .query(trc)?
603 } else {
604 (0, 0)
605 };
606
607 RedisResult::Ok(Some(result))
608 })
609 }
610
611 pub fn relinquish_by_id<C>(&self, con: &mut C, id: Uuid) -> RedisResult<(i64, i64)>
616 where
617 C: ConnectionLike,
618 {
619 let id = id.to_string();
620 let keys = &[
621 &self.catalog_key,
622 &self.item_expirations_key,
623 &self.checkout_expirations_key,
624 ];
625
626 redis::transaction(con, keys, |trc, pipe| {
627 let (zc,): (i64,) = pipe.zrem(&self.checkout_expirations_key, &id).query(trc)?;
628 let result = if zc == 1 {
629 pipe.clear();
630 let (item,): (CatalogItem<I>,) = pipe.hget(&self.catalog_key, &id).query(trc)?;
631 pipe.clear();
632 let expires_on = item
633 .expires_on
634 .unwrap_or(self.default_item_expiration.as_f64_timestamp());
635 let (zi,): (i64,) = pipe
636 .zadd(&self.item_expirations_key, &id, expires_on)
637 .query(trc)?;
638 (zc, zi)
639 } else {
640 (0, 0)
641 };
642
643 RedisResult::Ok(Some(result))
644 })
645 }
646
647 pub fn delete_by_id<C>(&self, con: &mut C, id: Uuid) -> RedisResult<(i64, i64, i64)>
649 where
650 C: ConnectionLike,
651 {
652 let id = id.to_string();
653 let keys = &[
654 &self.catalog_key,
655 &self.item_expirations_key,
656 &self.checkout_expirations_key,
657 ];
658
659 redis::transaction(con, keys, |trc, pipe| {
660 pipe.zrem(&self.item_expirations_key, &id)
661 .zrem(&self.checkout_expirations_key, &id)
662 .hdel(&self.catalog_key, &id)
663 .query(trc)
664 })
665 }
666
667 pub fn delete_and_get_by_id<C>(
669 &self,
670 con: &mut C,
671 id: Uuid,
672 ) -> RedisResult<Option<CatalogItem<I>>>
673 where
674 C: ConnectionLike,
675 {
676 let id = id.to_string();
677 let keys = &[
678 &self.catalog_key,
679 &self.item_expirations_key,
680 &self.checkout_expirations_key,
681 ];
682
683 redis::transaction(con, keys, |trc, pipe| {
684 let (_, _, item, _): (i64, i64, Option<CatalogItem<I>>, i64) = pipe
685 .zrem(&self.item_expirations_key, &id)
686 .zrem(&self.checkout_expirations_key, &id)
687 .hget(&self.catalog_key, &id)
688 .hdel(&self.catalog_key, &id)
689 .query(trc)?;
690
691 RedisResult::Ok(Some(item))
692 })
693 }
694
695 pub fn delete_multiple_by_id<C>(
697 &self,
698 con: &mut C,
699 ids: &[Uuid],
700 ) -> RedisResult<(i64, i64, i64)>
701 where
702 C: ConnectionLike,
703 {
704 let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
705 let keys = &[
706 &self.catalog_key,
707 &self.item_expirations_key,
708 &self.checkout_expirations_key,
709 ];
710
711 redis::transaction(con, keys, |trc, pipe| {
712 pipe.zrem(&self.item_expirations_key, &id_strings)
713 .zrem(&self.checkout_expirations_key, &id_strings)
714 .hdel(&self.catalog_key, &id_strings)
715 .query(trc)
716 })
717 }
718
719 pub fn delete_and_get_multiple_by_id<C>(
721 &self,
722 con: &mut C,
723 ids: &[Uuid],
724 ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
725 where
726 C: ConnectionLike,
727 {
728 let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
729 let keys = &[
730 &self.catalog_key,
731 &self.item_expirations_key,
732 &self.checkout_expirations_key,
733 ];
734
735 redis::transaction(con, keys, |trc, pipe| {
736 let (_, _, items, _): (i64, i64, Vec<Option<CatalogItem<I>>>, i64) = pipe
737 .zrem(&self.item_expirations_key, &id_strings)
738 .zrem(&self.checkout_expirations_key, &id_strings)
739 .hmget(&self.catalog_key, &id_strings)
740 .hdel(&self.catalog_key, &id_strings)
741 .query(trc)?;
742
743 RedisResult::Ok(Some(items))
744 })
745 }
746}