rcqs/
catalog.rs

1use super::{expire::Expiration, item::CatalogItem};
2use chrono::Utc;
3use core::f64;
4use redis::{ConnectionLike, RedisResult};
5use serde::{de::DeserializeOwned, Serialize};
6use std::{fmt::Debug, marker::PhantomData, num::NonZero};
7use uuid::Uuid;
8
9#[derive(Debug, Clone)]
10pub struct Catalog<I>
11where
12    I: Debug + Serialize + DeserializeOwned,
13{
14    root_namespace: String,
15    name: String,
16    catalog_key: String,
17    item_expirations_key: String,
18    checkout_expirations_key: String,
19    default_item_expiration: Expiration,
20    default_checkout_expiration: Expiration,
21    _item_type: PhantomData<CatalogItem<I>>,
22}
23
24impl<I> Catalog<I>
25where
26    I: Debug + Serialize + DeserializeOwned,
27{
28    /// Create a new [`Catalog`] with its keys at a given root namespace and name.
29    /// Requires a default item expiration and a default checkout expiration.
30    pub fn new(
31        root_namespace: String,
32        name: String,
33        default_item_expiration: Expiration,
34        default_checkout_expiration: Expiration,
35    ) -> Self {
36        let catalog_ns = format!("{}:{}", root_namespace, name);
37        let catalog_key = format!("{}:catalog", catalog_ns);
38        let item_expirations_key = format!("{}:item-expirations", catalog_ns);
39        let checkout_expirations_key = format!("{}:checkout-expirations", catalog_ns);
40
41        Self {
42            root_namespace,
43            name,
44            catalog_key,
45            item_expirations_key,
46            checkout_expirations_key,
47            default_item_expiration,
48            default_checkout_expiration,
49            _item_type: PhantomData::<CatalogItem<I>>,
50        }
51    }
52
53    /// Root namespace or prefix for keys related to this [`Catalog`].
54    pub fn root_namespace(&self) -> &str {
55        self.root_namespace.as_str()
56    }
57
58    /// Name of this [`Catalog`].
59    pub fn name(&self) -> &str {
60        self.name.as_str()
61    }
62
63    /// Key for hash containing items.
64    pub fn catalog_key(&self) -> &str {
65        self.catalog_key.as_str()
66    }
67
68    /// Key for ordered set containing item expirations.
69    pub fn catalog_expirations_key(&self) -> &str {
70        self.item_expirations_key.as_str()
71    }
72
73    /// Key for ordered set containing checkout expirations.
74    pub fn checkouts_expirations_key(&self) -> &str {
75        self.checkout_expirations_key.as_str()
76    }
77
78    /// Default item expiration.
79    pub fn default_item_expiration(&self) -> Expiration {
80        self.default_item_expiration
81    }
82
83    /// Default checkout expiration.
84    pub fn default_checkout_expiration(&self) -> Expiration {
85        self.default_checkout_expiration
86    }
87
88    /// Delete all catalog keys from the database.
89    pub fn destroy_catalog<C>(self, con: &mut C) -> RedisResult<i64>
90    where
91        C: ConnectionLike,
92    {
93        let keys = &[
94            self.catalog_key,
95            self.item_expirations_key,
96            self.checkout_expirations_key,
97        ];
98        redis::transaction(con, keys, |trc, pipe| pipe.del(keys).query(trc)).map(|(n,)| n)
99    }
100
101    fn register_with_expiration_f64_timestamp<C>(
102        &self,
103        con: &mut C,
104        item: CatalogItem<I>,
105        expires_on: f64,
106    ) -> RedisResult<(i64, i64)>
107    where
108        C: ConnectionLike,
109    {
110        let keys = &[
111            &self.catalog_key,
112            &self.item_expirations_key,
113            &self.checkout_expirations_key,
114        ];
115        let item_id = item.id.to_string();
116        redis::transaction(con, keys, move |trc, pipe| {
117            pipe.zadd(&self.item_expirations_key, &item_id, expires_on)
118                .hset(&self.catalog_key, &item_id, &item)
119                .query(trc)
120        })
121    }
122
123    /// Register item using its expiration or the catalog's default if none.
124    pub fn register<C>(&self, con: &mut C, item: CatalogItem<I>) -> RedisResult<(i64, i64)>
125    where
126        C: ConnectionLike,
127    {
128        let expires_on = item
129            .expires_on
130            .unwrap_or_else(|| self.default_item_expiration.as_f64_timestamp());
131        self.register_with_expiration_f64_timestamp(con, item, expires_on)
132    }
133
134    /// Register item using the provided expiration.
135    pub fn register_with_expiration<C>(
136        &self,
137        con: &mut C,
138        item: CatalogItem<I>,
139        expiration: Expiration,
140    ) -> RedisResult<(i64, i64)>
141    where
142        C: ConnectionLike,
143    {
144        let expires_on = expiration.as_f64_timestamp();
145        self.register_with_expiration_f64_timestamp(con, item, expires_on)
146    }
147
148    fn register_multiple_with_f64_timestamp_expirations<C>(
149        &self,
150        con: &mut C,
151        items: &[CatalogItem<I>],
152        expirations: &[f64],
153    ) -> RedisResult<(i64, bool)>
154    where
155        C: ConnectionLike,
156    {
157        debug_assert_eq!(expirations.len(), items.len());
158
159        let keys = &[
160            &self.catalog_key,
161            &self.item_expirations_key,
162            &self.checkout_expirations_key,
163        ];
164
165        let scores_members: Vec<(&f64, String)> = expirations
166            .iter()
167            .zip(items.iter().map(|item| item.id.to_string()))
168            .collect();
169
170        let item_kvs: Vec<(String, &CatalogItem<I>)> = items
171            .iter()
172            .map(|item| (item.id.to_string(), item))
173            .collect();
174
175        redis::transaction(con, keys, move |trc, pipe| {
176            let result: RedisResult<(i64, String)> = pipe
177                .zadd_multiple(&self.item_expirations_key, &scores_members)
178                .hset_multiple(&self.catalog_key, &item_kvs)
179                .query(trc);
180
181            result.map(|(z, h)| Some((z, h == "OK")))
182        })
183    }
184
185    /// Register items using their expiration or the catalog's default if none.
186    pub fn register_multiple<C>(
187        &self,
188        con: &mut C,
189        items: &[CatalogItem<I>],
190    ) -> RedisResult<(i64, bool)>
191    where
192        C: ConnectionLike,
193    {
194        let default_expiration = self.default_item_expiration.as_f64_timestamp();
195        let expirations: Vec<f64> = items
196            .iter()
197            .map(|item| item.expires_on.unwrap_or(default_expiration))
198            .collect();
199
200        self.register_multiple_with_f64_timestamp_expirations(con, items, &expirations)
201    }
202
203    /// Register items using the provided expiration.
204    pub fn register_multiple_with_expiration<C>(
205        &self,
206        con: &mut C,
207        items: &[CatalogItem<I>],
208        expiration: Expiration,
209    ) -> RedisResult<(i64, bool)>
210    where
211        C: ConnectionLike,
212    {
213        let expiration = expiration.as_f64_timestamp();
214        let expirations = vec![expiration; items.len()];
215        self.register_multiple_with_f64_timestamp_expirations(con, items, &expirations)
216    }
217
218    fn checkout_with_f64_timestamp_timeout<C>(
219        &self,
220        con: &mut C,
221        timeout_on: f64,
222    ) -> RedisResult<Option<CatalogItem<I>>>
223    where
224        C: ConnectionLike,
225    {
226        let keys = &[
227            &self.catalog_key,
228            &self.item_expirations_key,
229            &self.checkout_expirations_key,
230        ];
231
232        redis::transaction(con, keys, |trc, pipe| {
233            let (items_scores,): (Vec<(String, f64)>,) =
234                pipe.zpopmin(&self.item_expirations_key, 1).query(trc)?;
235
236            let result = if let Some((item_id, _item_expiration)) = items_scores.first() {
237                pipe.clear();
238                let (queried_item,): (Option<CatalogItem<I>>,) =
239                    pipe.hget(&self.catalog_key, item_id).query(trc)?;
240
241                if queried_item.is_some() {
242                    pipe.clear();
243                    let _: (i64,) = pipe
244                        .zadd(&self.checkout_expirations_key, item_id, timeout_on)
245                        .query(trc)?;
246                }
247
248                queried_item
249            } else {
250                None
251            };
252
253            RedisResult::Ok(Some(result))
254        })
255    }
256
257    /// Checkout item using the catalog's default checkout timeout.
258    pub fn checkout<C>(&self, con: &mut C) -> RedisResult<Option<CatalogItem<I>>>
259    where
260        C: ConnectionLike,
261    {
262        let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
263        self.checkout_with_f64_timestamp_timeout(con, timeout_on)
264    }
265
266    /// Checkout item using the provided checkout timeout.
267    pub fn checkout_with_timeout<C>(
268        &self,
269        con: &mut C,
270        timeout: Expiration,
271    ) -> RedisResult<Option<CatalogItem<I>>>
272    where
273        C: ConnectionLike,
274    {
275        let timeout_on = timeout.as_f64_timestamp();
276        self.checkout_with_f64_timestamp_timeout(con, timeout_on)
277    }
278
279    fn checkout_multiple_with_f64_timestamp_timeout<C>(
280        &self,
281        con: &mut C,
282        count: NonZero<usize>,
283        timeout_on: f64,
284    ) -> RedisResult<Vec<CatalogItem<I>>>
285    where
286        C: ConnectionLike,
287    {
288        let keys = &[
289            &self.catalog_key,
290            &self.item_expirations_key,
291            &self.checkout_expirations_key,
292        ];
293
294        redis::transaction(con, keys, |trc, pipe| {
295            let (item_expirations,): (Vec<(String, f64)>,) = pipe
296                .zpopmin(&self.item_expirations_key, count.get() as isize)
297                .query(trc)?;
298            let item_ids: Vec<String> = item_expirations.into_iter().map(|(id, _)| id).collect();
299            pipe.clear();
300            let (queried_items,): (Vec<Option<CatalogItem<I>>>,) =
301                pipe.hget(&self.catalog_key, &item_ids).query(trc)?;
302            let found_items: Vec<CatalogItem<I>> = queried_items.into_iter().flatten().collect();
303
304            if !found_items.is_empty() {
305                pipe.clear();
306                let scores_ids: Vec<(f64, String)> = found_items
307                    .iter()
308                    .map(|item| (timeout_on, item.id.to_string()))
309                    .collect();
310
311                let _: (String,) = pipe
312                    .zadd_multiple(&self.checkout_expirations_key, &scores_ids)
313                    .query(trc)?;
314            }
315
316            RedisResult::Ok(Some(found_items))
317        })
318    }
319
320    /// Checkout items using the catalog's default checkout timeout.
321    pub fn checkout_multiple<C>(
322        &self,
323        con: &mut C,
324        count: NonZero<usize>,
325    ) -> RedisResult<Vec<CatalogItem<I>>>
326    where
327        C: ConnectionLike,
328    {
329        let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
330        self.checkout_multiple_with_f64_timestamp_timeout(con, count, timeout_on)
331    }
332
333    /// Checkout items using the provided checkout timeout.
334    pub fn checkout_multiple_with_timeout<C>(
335        &self,
336        con: &mut C,
337        count: NonZero<usize>,
338        timeout: Expiration,
339    ) -> RedisResult<Vec<CatalogItem<I>>>
340    where
341        C: ConnectionLike,
342    {
343        let timeout_on = timeout.as_f64_timestamp();
344        self.checkout_multiple_with_f64_timestamp_timeout(con, count, timeout_on)
345    }
346
347    fn checkout_by_id_with_f64_timestamp_timeout<C>(
348        &self,
349        con: &mut C,
350        id: Uuid,
351        timeout_on: f64,
352    ) -> RedisResult<Option<CatalogItem<I>>>
353    where
354        C: ConnectionLike,
355    {
356        let keys = &[
357            &self.catalog_key,
358            &self.item_expirations_key,
359            &self.checkout_expirations_key,
360        ];
361        let item_id = id.to_string();
362
363        redis::transaction(con, keys, |trc, pipe| {
364            let (n,): (i64,) = pipe.zrem(&self.item_expirations_key, &item_id).query(trc)?;
365            if n == 0 {
366                return RedisResult::Ok(Some(None));
367            }
368            pipe.clear();
369
370            let (queried_item,): (Option<CatalogItem<I>>,) =
371                pipe.hget(&self.catalog_key, &item_id).query(trc)?;
372            if queried_item.is_some() {
373                pipe.clear();
374                let _: (i64,) = pipe
375                    .zadd(&self.checkout_expirations_key, &item_id, timeout_on)
376                    .query(trc)?;
377            }
378
379            RedisResult::Ok(Some(queried_item))
380        })
381    }
382
383    /// Checkout item by ID using the catalog's default checkout timeout.
384    pub fn checkout_by_id<C>(&self, con: &mut C, id: Uuid) -> RedisResult<Option<CatalogItem<I>>>
385    where
386        C: ConnectionLike,
387    {
388        let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
389        self.checkout_by_id_with_f64_timestamp_timeout(con, id, timeout_on)
390    }
391
392    /// Checkout item by ID using the provided checkout timeout.
393    pub fn checkout_by_id_with_timeout<C>(
394        &self,
395        con: &mut C,
396        id: Uuid,
397        timeout: Expiration,
398    ) -> RedisResult<Option<CatalogItem<I>>>
399    where
400        C: ConnectionLike,
401    {
402        let timeout_on = timeout.as_f64_timestamp();
403        self.checkout_by_id_with_f64_timestamp_timeout(con, id, timeout_on)
404    }
405
406    fn checkout_multiple_by_id_with_f64_timestamp_timeout<C>(
407        &self,
408        con: &mut C,
409        ids: &[Uuid],
410        timeout_on: f64,
411    ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
412    where
413        C: ConnectionLike,
414    {
415        let keys = &[
416            &self.catalog_key,
417            &self.item_expirations_key,
418            &self.checkout_expirations_key,
419        ];
420        let item_ids: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
421
422        redis::transaction(con, keys, |trc, pipe| {
423            let (scores,): (Vec<Option<f64>>,) = pipe
424                .zscore_multiple(&self.item_expirations_key, &item_ids)
425                .query(trc)?;
426            pipe.clear();
427            let _: (i64,) = pipe
428                .zrem(&self.item_expirations_key, &item_ids)
429                .query(trc)?;
430            let found_ids: Vec<&String> = item_ids
431                .iter()
432                .zip(scores.iter())
433                .filter_map(|(id, score)| score.map(|_| id))
434                .collect();
435
436            let result = if !found_ids.is_empty() {
437                pipe.clear();
438                let (queried_items,): (Vec<Option<CatalogItem<I>>>,) =
439                    pipe.hget(&self.catalog_key, &found_ids).query(trc)?;
440                let found_items: Vec<&CatalogItem<I>> = queried_items.iter().flatten().collect();
441                if !found_items.is_empty() {
442                    let scores_ids: Vec<(f64, String)> = found_items
443                        .iter()
444                        .map(|item| (timeout_on, item.id.to_string()))
445                        .collect();
446
447                    pipe.clear();
448                    let _: (i64,) = pipe
449                        .zadd_multiple(&self.checkout_expirations_key, &scores_ids)
450                        .query(trc)?;
451                }
452
453                queried_items
454            } else {
455                Vec::new()
456            };
457
458            RedisResult::Ok(Some(result))
459        })
460    }
461
462    /// Checkout items by ID using the catalog's default checkout timeout.
463    pub fn checkout_multiple_by_id<C>(
464        &self,
465        con: &mut C,
466        ids: &[Uuid],
467    ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
468    where
469        C: ConnectionLike,
470    {
471        let timeout_on = self.default_checkout_expiration.as_f64_timestamp();
472        self.checkout_multiple_by_id_with_f64_timestamp_timeout(con, ids, timeout_on)
473    }
474
475    /// Checkout items by ID using the provided checkout timeout.
476    pub fn checkout_multiple_by_id_with_timeout<C>(
477        &self,
478        con: &mut C,
479        ids: &[Uuid],
480        timeout: Expiration,
481    ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
482    where
483        C: ConnectionLike,
484    {
485        let timeout_on = timeout.as_f64_timestamp();
486        self.checkout_multiple_by_id_with_f64_timestamp_timeout(con, ids, timeout_on)
487    }
488
489    /// Query for and remove items that should be expired from the catalog.
490    pub fn expire_items<C>(&self, con: &mut C) -> RedisResult<(i64, i64)>
491    where
492        C: ConnectionLike,
493    {
494        let now = Utc::now();
495        let ts = now.timestamp() as f64;
496        let keys = &[
497            &self.catalog_key,
498            &self.item_expirations_key,
499            &self.checkout_expirations_key,
500        ];
501
502        redis::transaction(con, keys, |trc, pipe| {
503            let (item_ids,): (Vec<String>,) = pipe
504                .zrangebyscore(&self.item_expirations_key, 0, ts)
505                .query(trc)?;
506
507            let result: (i64, i64) = if !item_ids.is_empty() {
508                pipe.clear();
509                pipe.hdel(&self.catalog_key, &item_ids)
510                    .zrem(&self.item_expirations_key, &item_ids)
511                    .query(trc)?
512            } else {
513                (0, 0)
514            };
515
516            RedisResult::Ok(Some(result))
517        })
518    }
519
520    /// Query for, remove, and return items that should be expired from the catalog.
521    pub fn expire_and_get_items<C>(&self, con: &mut C) -> RedisResult<Vec<CatalogItem<I>>>
522    where
523        C: ConnectionLike,
524    {
525        let now = Utc::now();
526        let ts = now.timestamp() as f64;
527        let keys = &[
528            &self.catalog_key,
529            &self.item_expirations_key,
530            &self.checkout_expirations_key,
531        ];
532
533        redis::transaction(con, keys, |trc, pipe| {
534            let (item_ids,): (Vec<String>,) = pipe
535                .zrangebyscore(&self.item_expirations_key, f64::NEG_INFINITY, ts)
536                .query(trc)?;
537
538            let result = if !item_ids.is_empty() {
539                pipe.clear();
540                let (items,): (Vec<CatalogItem<I>>,) =
541                    pipe.hget(&self.catalog_key, &item_ids).query(trc)?;
542                pipe.clear();
543                let _: (i64, i64) = pipe
544                    .hdel(&self.catalog_key, &item_ids)
545                    .zrem(&self.item_expirations_key, &item_ids)
546                    .query(trc)?;
547
548                items
549            } else {
550                Vec::new()
551            };
552
553            RedisResult::Ok(Some(result))
554        })
555    }
556
557    /// Query for and return items whose checkout has timed out.
558    ///
559    /// If item is somehow missing an expiration timestamp, it will be set to
560    /// the catalog's default timeout.
561    pub fn timeout_checkouts<C>(&self, con: &mut C) -> RedisResult<(i64, i64)>
562    where
563        C: ConnectionLike,
564    {
565        let now = Utc::now();
566        let ts = now.timestamp() as f64;
567        let keys = &[
568            &self.catalog_key,
569            &self.item_expirations_key,
570            &self.checkout_expirations_key,
571        ];
572
573        redis::transaction(con, keys, |trc, pipe| {
574            let (checked_out_item_ids,): (Vec<String>,) = pipe
575                .zrangebyscore(&self.checkout_expirations_key, f64::NEG_INFINITY, ts)
576                .query(trc)?;
577
578            let result = if !checked_out_item_ids.is_empty() {
579                pipe.clear();
580                let (items,): (Vec<Option<CatalogItem<I>>>,) = pipe
581                    .hget(&self.catalog_key, &checked_out_item_ids)
582                    .query(trc)?;
583                let items: Vec<(&String, &CatalogItem<I>)> = checked_out_item_ids
584                    .iter()
585                    .zip(items.iter())
586                    .filter_map(|(score, item)| item.as_ref().map(|item| (score, item)))
587                    .collect();
588
589                let expirations: Vec<(f64, &String)> = items
590                    .iter()
591                    .map(|(item_id, item)| {
592                        let expires_on = item
593                            .expires_on
594                            .unwrap_or(self.default_item_expiration.as_f64_timestamp());
595                        (expires_on, *item_id)
596                    })
597                    .collect();
598
599                pipe.clear();
600                pipe.zadd_multiple(&self.item_expirations_key, &expirations)
601                    .zrem(&self.checkout_expirations_key, &checked_out_item_ids)
602                    .query(trc)?
603            } else {
604                (0, 0)
605            };
606
607            RedisResult::Ok(Some(result))
608        })
609    }
610
611    /// Relinquish a checked out item back to the catalog ahead of the checkout timeout.
612    ///
613    /// If item is somehow missing an expiration timestamp, it will be set to
614    /// the catalog's default timeout.
615    pub fn relinquish_by_id<C>(&self, con: &mut C, id: Uuid) -> RedisResult<(i64, i64)>
616    where
617        C: ConnectionLike,
618    {
619        let id = id.to_string();
620        let keys = &[
621            &self.catalog_key,
622            &self.item_expirations_key,
623            &self.checkout_expirations_key,
624        ];
625
626        redis::transaction(con, keys, |trc, pipe| {
627            let (zc,): (i64,) = pipe.zrem(&self.checkout_expirations_key, &id).query(trc)?;
628            let result = if zc == 1 {
629                pipe.clear();
630                let (item,): (CatalogItem<I>,) = pipe.hget(&self.catalog_key, &id).query(trc)?;
631                pipe.clear();
632                let expires_on = item
633                    .expires_on
634                    .unwrap_or(self.default_item_expiration.as_f64_timestamp());
635                let (zi,): (i64,) = pipe
636                    .zadd(&self.item_expirations_key, &id, expires_on)
637                    .query(trc)?;
638                (zc, zi)
639            } else {
640                (0, 0)
641            };
642
643            RedisResult::Ok(Some(result))
644        })
645    }
646
647    /// Delete an item from the catalog.
648    pub fn delete_by_id<C>(&self, con: &mut C, id: Uuid) -> RedisResult<(i64, i64, i64)>
649    where
650        C: ConnectionLike,
651    {
652        let id = id.to_string();
653        let keys = &[
654            &self.catalog_key,
655            &self.item_expirations_key,
656            &self.checkout_expirations_key,
657        ];
658
659        redis::transaction(con, keys, |trc, pipe| {
660            pipe.zrem(&self.item_expirations_key, &id)
661                .zrem(&self.checkout_expirations_key, &id)
662                .hdel(&self.catalog_key, &id)
663                .query(trc)
664        })
665    }
666
667    /// Delete and get an item from the catalog.
668    pub fn delete_and_get_by_id<C>(
669        &self,
670        con: &mut C,
671        id: Uuid,
672    ) -> RedisResult<Option<CatalogItem<I>>>
673    where
674        C: ConnectionLike,
675    {
676        let id = id.to_string();
677        let keys = &[
678            &self.catalog_key,
679            &self.item_expirations_key,
680            &self.checkout_expirations_key,
681        ];
682
683        redis::transaction(con, keys, |trc, pipe| {
684            let (_, _, item, _): (i64, i64, Option<CatalogItem<I>>, i64) = pipe
685                .zrem(&self.item_expirations_key, &id)
686                .zrem(&self.checkout_expirations_key, &id)
687                .hget(&self.catalog_key, &id)
688                .hdel(&self.catalog_key, &id)
689                .query(trc)?;
690
691            RedisResult::Ok(Some(item))
692        })
693    }
694
695    /// Delete items from the catalog.
696    pub fn delete_multiple_by_id<C>(
697        &self,
698        con: &mut C,
699        ids: &[Uuid],
700    ) -> RedisResult<(i64, i64, i64)>
701    where
702        C: ConnectionLike,
703    {
704        let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
705        let keys = &[
706            &self.catalog_key,
707            &self.item_expirations_key,
708            &self.checkout_expirations_key,
709        ];
710
711        redis::transaction(con, keys, |trc, pipe| {
712            pipe.zrem(&self.item_expirations_key, &id_strings)
713                .zrem(&self.checkout_expirations_key, &id_strings)
714                .hdel(&self.catalog_key, &id_strings)
715                .query(trc)
716        })
717    }
718
719    /// Delete and get items from the catalog.
720    pub fn delete_and_get_multiple_by_id<C>(
721        &self,
722        con: &mut C,
723        ids: &[Uuid],
724    ) -> RedisResult<Vec<Option<CatalogItem<I>>>>
725    where
726        C: ConnectionLike,
727    {
728        let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
729        let keys = &[
730            &self.catalog_key,
731            &self.item_expirations_key,
732            &self.checkout_expirations_key,
733        ];
734
735        redis::transaction(con, keys, |trc, pipe| {
736            let (_, _, items, _): (i64, i64, Vec<Option<CatalogItem<I>>>, i64) = pipe
737                .zrem(&self.item_expirations_key, &id_strings)
738                .zrem(&self.checkout_expirations_key, &id_strings)
739                .hget(&self.catalog_key, &id_strings)
740                .hdel(&self.catalog_key, &id_strings)
741                .query(trc)?;
742
743            RedisResult::Ok(Some(items))
744        })
745    }
746}