Skip to main content

zrx_store/
queue.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Queue.
27
28use ahash::HashMap;
29use slab::Slab;
30use std::borrow::Borrow;
31use std::fmt::{self, Debug};
32use std::mem;
33use std::time::Instant;
34
35use crate::store::decorator::Ordered;
36use crate::store::item::{Key, Value};
37use crate::store::{
38    Store, StoreIterable, StoreIterableMut, StoreMut, StoreMutRef,
39};
40
41mod item;
42mod iter;
43
44pub use item::Item;
45pub use iter::{Iter, IterMut, Keys, Values};
46
47// ----------------------------------------------------------------------------
48// Structs
49// ----------------------------------------------------------------------------
50
51/// Queue.
52///
53/// This is a specialization of [`Store`], more specifically [`Ordered`], that
54/// maintains insertion order and allows to assign specific deadlines to items.
55/// Values themselves don't need to implement [`Ord`], since the ordering is
56/// completely independent and induced by the queue.
57///
58/// When an item is inserted, it is annotated with [`Instant::now`], which on
59/// the one hand implements insertion order, and on the other hand allows to
60/// change the ordering of an item through [`Queue::set_deadline`]. This allows
61/// to remove items from visibility until a certain point in time.
62///
63/// Additionally, [`Queue`] is not a queue in the traditional sense, since it
64/// adds queueing to a [`Store`], an immutable collection of key-value pairs.
65/// The queue is self-organizing, and iterating over it will always yield the
66/// correct order of items at that specific point in time.
67///
68/// # Examples
69///
70/// ```
71/// use zrx_store::{Queue, StoreIterable, StoreMut};
72///
73/// // Create queue and initial state
74/// let mut queue = Queue::default();
75/// queue.insert("a", 4);
76/// queue.insert("b", 2);
77/// queue.insert("c", 3);
78/// queue.insert("d", 1);
79///
80/// // Create iterator over the queue
81/// for (key, value) in &queue {
82///     println!("{key}: {value}");
83/// }
84/// ```
85#[derive(Clone)]
86pub struct Queue<K, V, S = HashMap<K, Item>> {
87    /// Underlying store.
88    store: Ordered<K, Item, S>,
89    /// Queue items.
90    items: Slab<V>,
91}
92
93// ----------------------------------------------------------------------------
94// Implementations
95// ----------------------------------------------------------------------------
96
97impl<K, V, S> Queue<K, V, S>
98where
99    K: Key,
100    S: Store<K, Item>,
101{
102    /// Creates a queue.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// use std::collections::HashMap;
108    /// use zrx_store::{Queue, StoreMut};
109    ///
110    /// // Create queue
111    /// let mut queue = Queue::<_, _, HashMap<_, _>>::new();
112    /// queue.insert("key", 42);
113    /// ```
114    #[must_use]
115    pub fn new() -> Self
116    where
117        S: Default,
118    {
119        Self {
120            store: Ordered::new(),
121            items: Slab::new(),
122        }
123    }
124
125    /// Returns the deadline of the item identified by the key.
126    ///
127    /// # Examples
128    ///
129    /// ```
130    /// use std::time::Instant;
131    /// use zrx_store::{Queue, StoreMut};
132    ///
133    /// // Create queue and initial state
134    /// let mut queue = Queue::default();
135    /// queue.insert("key", 42);
136    ///
137    /// // Obtain deadline of item
138    /// let deadline = queue.get_deadline(&"key");
139    /// assert!(deadline < Some(Instant::now()));
140    /// ```
141    #[inline]
142    pub fn get_deadline(&self, key: &K) -> Option<Instant> {
143        self.store.get(key).map(Item::deadline)
144    }
145}
146
147impl<K, V, S> Queue<K, V, S>
148where
149    K: Key,
150    S: StoreMut<K, Item>,
151{
152    /// Sets the deadline of the item identified by the key.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// use std::time::Instant;
158    /// use zrx_store::{Queue, StoreMut};
159    ///
160    /// // Create queue and initial state
161    /// let mut queue = Queue::default();
162    /// queue.insert("key", 42);
163    ///
164    /// // Update deadline of item
165    /// queue.set_deadline(&"key", Instant::now());
166    /// ```
167    #[inline]
168    pub fn set_deadline(
169        &mut self, key: &K, deadline: Instant,
170    ) -> Option<Instant> {
171        self.store.remove(key).and_then(|mut item| {
172            item.set_deadline(deadline);
173            self.store
174                .insert(key.clone(), item)
175                .map(|prior| prior.deadline())
176        })
177    }
178}
179
180impl<K, V, S> Queue<K, V, S>
181where
182    K: Key,
183    V: Value,
184    S: StoreMut<K, Item> + StoreIterable<K, Item>,
185{
186    /// Returns the minimum deadline of all items.
187    ///
188    /// # Examples
189    ///
190    /// ```
191    /// use std::time::Instant;
192    /// use zrx_store::{Queue, StoreMut};
193    ///
194    /// // Create queue and initial state
195    /// let mut queue = Queue::default();
196    /// queue.insert("key", 42);
197    ///
198    /// // Obtain minimum deadline of all items
199    /// let deadline = queue.deadline();
200    /// assert!(deadline < Some(Instant::now()));
201    ///
202    #[inline]
203    pub fn deadline(&self) -> Option<Instant> {
204        self.store.iter().next().map(|(_, item)| item.deadline())
205    }
206
207    /// Takes ownership of the next item that is due.
208    ///
209    /// Items are considered to be due if [`Instant::now`] has passed the value
210    /// stored in [`Item::deadline`], which allows to defer processing.
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// use zrx_store::{Queue, StoreMut};
216    ///
217    /// // Create queue and initial state
218    /// let mut queue = Queue::default();
219    /// queue.insert("a", 4);
220    /// queue.insert("b", 2);
221    /// queue.insert("c", 3);
222    /// queue.insert("d", 1);
223    ///
224    /// // Obtain items from queue
225    /// while let Some((key, value)) = queue.take() {
226    ///     println!("{key}: {value}");
227    /// }
228    /// ```
229    #[allow(clippy::missing_panics_doc)]
230    #[inline]
231    pub fn take(&mut self) -> Option<(K, V)> {
232        // Obtain the current instant once to select due items during iteration,
233        // or tight loops might experience slowdowns of up to a factor of 6
234        let deadline = Instant::now();
235        let opt = self.store.iter().next().and_then(|(key, item)| {
236            (item.deadline() <= deadline).then(|| key.clone())
237        });
238
239        // Remove and return the first item we found, which is the next item
240        // in current queue order that can be considered to be due
241        opt.map(|key| {
242            // We can safely use expect here, since we're iterating over a
243            // store that is synchronized with the ordering
244            self.remove(&key)
245                .map(|value| (key, value))
246                .expect("invariant")
247        })
248    }
249}
250
251// ----------------------------------------------------------------------------
252// Trait implementations
253// ----------------------------------------------------------------------------
254
255impl<K, V, S> Store<K, V> for Queue<K, V, S>
256where
257    K: Key,
258    S: Store<K, Item>,
259{
260    /// Returns a reference to the value identified by the key.
261    ///
262    /// # Examples
263    ///
264    /// ```
265    /// use zrx_store::{Queue, Store, StoreMut};
266    ///
267    /// // Create queue and initial state
268    /// let mut queue = Queue::default();
269    /// queue.insert("key", 42);
270    ///
271    /// // Obtain reference to value
272    /// let value = queue.get(&"key");
273    /// assert_eq!(value, Some(&42));
274    /// ```
275    #[inline]
276    fn get<Q>(&self, key: &Q) -> Option<&V>
277    where
278        K: Borrow<Q>,
279        Q: Key,
280    {
281        match self.store.get(key) {
282            Some(item) => self.items.get(*item.data()),
283            None => None,
284        }
285    }
286
287    /// Returns whether the queue contains the key.
288    ///
289    /// # Examples
290    ///
291    /// ```
292    /// use zrx_store::{Queue, Store, StoreMut};
293    ///
294    /// // Create queue and initial state
295    /// let mut queue = Queue::default();
296    /// queue.insert("key", 42);
297    ///
298    /// // Ensure presence of key
299    /// let check = queue.contains_key(&"key");
300    /// assert_eq!(check, true);
301    /// ```
302    #[inline]
303    fn contains_key<Q>(&self, key: &Q) -> bool
304    where
305        K: Borrow<Q>,
306        Q: Key,
307    {
308        self.store.contains_key(key)
309    }
310
311    /// Returns the number of items in the queue.
312    #[inline]
313    fn len(&self) -> usize {
314        self.store.len()
315    }
316}
317
318impl<K, V, S> StoreMut<K, V> for Queue<K, V, S>
319where
320    K: Key,
321    V: Value,
322    S: StoreMut<K, Item>,
323{
324    /// Inserts the value identified by the key.
325    ///
326    /// This method only updates the data of the [`Item`], but does not change
327    /// the values of [`Item::deadline`] in case the item already exists.
328    ///
329    /// # Examples
330    ///
331    /// ```
332    /// use zrx_store::{Queue, StoreMut};
333    ///
334    /// // Create queue
335    /// let mut queue = Queue::default();
336    ///
337    /// // Insert value
338    /// let value = queue.insert("key", 42);
339    /// assert_eq!(value, None);
340    /// ```
341    #[inline]
342    fn insert(&mut self, key: K, value: V) -> Option<V> {
343        if let Some(item) = self.store.get(&key) {
344            let prior = &mut self.items[*item.data()];
345            (prior != &value).then(|| mem::replace(prior, value))
346        } else {
347            let index = self.items.insert(value);
348            self.store.insert(key, Item::new(index));
349            None
350        }
351    }
352
353    /// Removes the value identified by the key.
354    ///
355    /// # Examples
356    ///
357    /// ```
358    /// use zrx_store::{Queue, StoreMut};
359    ///
360    /// // Create queue and initial state
361    /// let mut queue = Queue::default();
362    /// queue.insert("key", 42);
363    ///
364    /// // Remove and return value
365    /// let value = queue.remove(&"key");
366    /// assert_eq!(value, Some(42));
367    /// ```
368    #[inline]
369    fn remove<Q>(&mut self, key: &Q) -> Option<V>
370    where
371        K: Borrow<Q>,
372        Q: Key,
373    {
374        self.store
375            .remove(key)
376            .map(|item| self.items.remove(*item.data()))
377    }
378
379    /// Removes the value identified by the key and returns both.
380    ///
381    /// # Examples
382    ///
383    /// ```
384    /// use zrx_store::{Queue, StoreMut};
385    ///
386    /// // Create queue and initial state
387    /// let mut queue = Queue::default();
388    /// queue.insert("key", 42);
389    ///
390    /// // Remove and return entry
391    /// let entry = queue.remove_entry(&"key");
392    /// assert_eq!(entry, Some(("key", 42)));
393    /// ```
394    #[inline]
395    fn remove_entry<Q>(&mut self, key: &Q) -> Option<(K, V)>
396    where
397        K: Borrow<Q>,
398        Q: Key,
399    {
400        self.store
401            .remove_entry(key)
402            .map(|(key, item)| (key, self.items.remove(*item.data())))
403    }
404
405    /// Clears the queue, removing all items.
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// use zrx_store::{Queue, Store, StoreMut};
411    ///
412    /// // Create queue and initial state
413    /// let mut queue = Queue::default();
414    /// queue.insert("key", 42);
415    ///
416    /// // Clear queue
417    /// queue.clear();
418    /// assert!(queue.is_empty());
419    /// ```
420    #[inline]
421    fn clear(&mut self) {
422        self.store.clear();
423        self.items.clear();
424    }
425}
426
427impl<K, V, S> StoreMutRef<K, V> for Queue<K, V, S>
428where
429    K: Key,
430    S: StoreMut<K, Item>,
431{
432    /// Returns a mutable reference to the value identified by the key.
433    ///
434    /// # Examples
435    ///
436    /// ```
437    /// use zrx_store::{Queue, StoreMut, StoreMutRef};
438    ///
439    /// // Create queue and initial state
440    /// let mut queue = Queue::default();
441    /// queue.insert("key", 42);
442    ///
443    /// // Obtain mutable reference to value
444    /// let mut value = queue.get_mut(&"key");
445    /// assert_eq!(value, Some(&mut 42));
446    /// ```
447    #[inline]
448    fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
449    where
450        K: Borrow<Q>,
451        Q: Key,
452    {
453        match self.store.get(key) {
454            Some(item) => self.items.get_mut(*item.data()),
455            None => None,
456        }
457    }
458
459    /// Returns a mutable reference to the value or creates the default.
460    ///
461    /// # Examples
462    ///
463    /// ```
464    /// use zrx_store::{Queue, StoreMutRef};
465    ///
466    /// // Create queue
467    /// let mut queue = Queue::<_, i32>::default();
468    ///
469    /// // Obtain mutable reference to value
470    /// let value = queue.get_or_insert_default(&"key");
471    /// assert_eq!(value, &mut 0);
472    /// ```
473    #[inline]
474    fn get_or_insert_default(&mut self, key: &K) -> &mut V
475    where
476        V: Default,
477    {
478        if !self.store.contains_key(key) {
479            let index = self.items.insert(V::default());
480            self.store.insert(key.clone(), Item::new(index));
481        }
482
483        // We can safely use expect here, as the key is present
484        self.get_mut(key).expect("invariant")
485    }
486}
487
488// ----------------------------------------------------------------------------
489
490#[allow(clippy::into_iter_without_iter)]
491impl<'a, K, V, S> IntoIterator for &'a Queue<K, V, S>
492where
493    K: Key,
494    V: Value,
495    S: StoreIterable<K, Item>,
496{
497    type Item = (&'a K, &'a V);
498    type IntoIter = Iter<'a, K, V>;
499
500    /// Creates an iterator over the queue.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// use zrx_store::{Queue, StoreMut};
506    ///
507    /// // Create queue and initial state
508    /// let mut queue = Queue::default();
509    /// queue.insert("key", 42);
510    ///
511    /// // Create iterator over the queue
512    /// for (key, value) in &queue {
513    ///     println!("{key}: {value}");
514    /// }
515    /// ```
516    #[inline]
517    fn into_iter(self) -> Self::IntoIter {
518        self.iter()
519    }
520}
521
522#[allow(clippy::into_iter_without_iter)]
523impl<'a, K, V, S> IntoIterator for &'a mut Queue<K, V, S>
524where
525    K: Key,
526    V: Value,
527    S: StoreMut<K, Item> + StoreIterable<K, Item>,
528{
529    type Item = (&'a K, &'a mut V);
530    type IntoIter = IterMut<'a, K, V>;
531
532    /// Creates a mutable iterator over the queue.
533    ///
534    /// # Examples
535    ///
536    /// ```
537    /// use zrx_store::{Queue, StoreMut};
538    ///
539    /// // Create queue and initial state
540    /// let mut queue = Queue::default();
541    /// queue.insert("key", 42);
542    ///
543    /// // Create iterator over the queue
544    /// for (key, value) in &mut queue {
545    ///     println!("{key}: {value}");
546    /// }
547    /// ```
548    #[inline]
549    fn into_iter(self) -> Self::IntoIter {
550        self.iter_mut()
551    }
552}
553
554// ----------------------------------------------------------------------------
555
556impl<K, V> Default for Queue<K, V>
557where
558    K: Key,
559{
560    /// Creates a queue with [`HashMap::default`][] as a store.
561    ///
562    /// Note that this method does not allow to customize the [`BuildHasher`][],
563    /// but uses [`ahash`] by default, which is the fastest known hasher.
564    ///
565    /// [`BuildHasher`]: std::hash::BuildHasher
566    /// [`HashMap::default`]: Default::default
567    ///
568    /// # Examples
569    ///
570    /// ```
571    /// use zrx_store::{Queue, StoreMut};
572    ///
573    /// // Create queue
574    /// let mut queue = Queue::default();
575    ///
576    /// // Insert value
577    /// queue.insert("key", 42);
578    /// ```
579    #[inline]
580    fn default() -> Self {
581        Self::new()
582    }
583}
584
585// ----------------------------------------------------------------------------
586
587impl<K, V, S> Debug for Queue<K, V, S>
588where
589    K: Debug,
590    V: Debug,
591    S: Debug,
592{
593    /// Formats the queue for debugging.
594    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
595        f.debug_struct("Queue")
596            .field("store", &self.store)
597            .field("items", &self.items)
598            .finish()
599    }
600}