zrx_store/queue.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Queue.
27
28use ahash::HashMap;
29use slab::Slab;
30use std::borrow::Borrow;
31use std::fmt::{self, Debug};
32use std::mem;
33use std::time::Instant;
34
35use crate::store::decorator::Ordered;
36use crate::store::item::{Key, Value};
37use crate::store::{
38 Store, StoreIterable, StoreIterableMut, StoreMut, StoreMutRef,
39};
40
41mod item;
42mod iter;
43
44pub use item::Item;
45pub use iter::{Iter, IterMut, Keys, Values};
46
47// ----------------------------------------------------------------------------
48// Structs
49// ----------------------------------------------------------------------------
50
51/// Queue.
52///
53/// This is a specialization of [`Store`], more specifically [`Ordered`], that
54/// maintains insertion order and allows to assign specific deadlines to items.
55/// Values themselves don't need to implement [`Ord`], since the ordering is
56/// completely independent and induced by the queue.
57///
58/// When an item is inserted, it is annotated with [`Instant::now`], which on
59/// the one hand implements insertion order, and on the other hand allows to
60/// change the ordering of an item through [`Queue::set_deadline`]. This allows
61/// to remove items from visibility until a certain point in time.
62///
63/// Additionally, [`Queue`] is not a queue in the traditional sense, since it
64/// adds queueing to a [`Store`], an immutable collection of key-value pairs.
65/// The queue is self-organizing, and iterating over it will always yield the
66/// correct order of items at that specific point in time.
67///
68/// # Examples
69///
70/// ```
71/// use zrx_store::{Queue, StoreIterable, StoreMut};
72///
73/// // Create queue and initial state
74/// let mut queue = Queue::default();
75/// queue.insert("a", 4);
76/// queue.insert("b", 2);
77/// queue.insert("c", 3);
78/// queue.insert("d", 1);
79///
80/// // Create iterator over the queue
81/// for (key, value) in &queue {
82/// println!("{key}: {value}");
83/// }
84/// ```
85#[derive(Clone)]
86pub struct Queue<K, V, S = HashMap<K, Item>> {
87 /// Underlying store.
88 store: Ordered<K, Item, S>,
89 /// Queue items.
90 items: Slab<V>,
91}
92
93// ----------------------------------------------------------------------------
94// Implementations
95// ----------------------------------------------------------------------------
96
97impl<K, V, S> Queue<K, V, S>
98where
99 K: Key,
100 S: Store<K, Item>,
101{
102 /// Creates a queue.
103 ///
104 /// # Examples
105 ///
106 /// ```
107 /// use std::collections::HashMap;
108 /// use zrx_store::{Queue, StoreMut};
109 ///
110 /// // Create queue
111 /// let mut queue = Queue::<_, _, HashMap<_, _>>::new();
112 /// queue.insert("key", 42);
113 /// ```
114 #[must_use]
115 pub fn new() -> Self
116 where
117 S: Default,
118 {
119 Self {
120 store: Ordered::new(),
121 items: Slab::new(),
122 }
123 }
124
125 /// Returns the deadline of the item identified by the key.
126 ///
127 /// # Examples
128 ///
129 /// ```
130 /// use std::time::Instant;
131 /// use zrx_store::{Queue, StoreMut};
132 ///
133 /// // Create queue and initial state
134 /// let mut queue = Queue::default();
135 /// queue.insert("key", 42);
136 ///
137 /// // Obtain deadline of item
138 /// let deadline = queue.get_deadline(&"key");
139 /// assert!(deadline < Some(Instant::now()));
140 /// ```
141 #[inline]
142 pub fn get_deadline(&self, key: &K) -> Option<Instant> {
143 self.store.get(key).map(Item::deadline)
144 }
145}
146
147impl<K, V, S> Queue<K, V, S>
148where
149 K: Key,
150 S: StoreMut<K, Item>,
151{
152 /// Sets the deadline of the item identified by the key.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use std::time::Instant;
158 /// use zrx_store::{Queue, StoreMut};
159 ///
160 /// // Create queue and initial state
161 /// let mut queue = Queue::default();
162 /// queue.insert("key", 42);
163 ///
164 /// // Update deadline of item
165 /// queue.set_deadline(&"key", Instant::now());
166 /// ```
167 #[inline]
168 pub fn set_deadline(
169 &mut self, key: &K, deadline: Instant,
170 ) -> Option<Instant> {
171 self.store.remove(key).and_then(|mut item| {
172 item.set_deadline(deadline);
173 self.store
174 .insert(key.clone(), item)
175 .map(|prior| prior.deadline())
176 })
177 }
178}
179
180impl<K, V, S> Queue<K, V, S>
181where
182 K: Key,
183 V: Value,
184 S: StoreMut<K, Item> + StoreIterable<K, Item>,
185{
186 /// Returns the minimum deadline of all items.
187 ///
188 /// # Examples
189 ///
190 /// ```
191 /// use std::time::Instant;
192 /// use zrx_store::{Queue, StoreMut};
193 ///
194 /// // Create queue and initial state
195 /// let mut queue = Queue::default();
196 /// queue.insert("key", 42);
197 ///
198 /// // Obtain minimum deadline of all items
199 /// let deadline = queue.deadline();
200 /// assert!(deadline < Some(Instant::now()));
201 ///
202 #[inline]
203 pub fn deadline(&self) -> Option<Instant> {
204 self.store.iter().next().map(|(_, item)| item.deadline())
205 }
206
207 /// Takes ownership of the next item that is due.
208 ///
209 /// Items are considered to be due if [`Instant::now`] has passed the value
210 /// stored in [`Item::deadline`], which allows to defer processing.
211 ///
212 /// # Examples
213 ///
214 /// ```
215 /// use zrx_store::{Queue, StoreMut};
216 ///
217 /// // Create queue and initial state
218 /// let mut queue = Queue::default();
219 /// queue.insert("a", 4);
220 /// queue.insert("b", 2);
221 /// queue.insert("c", 3);
222 /// queue.insert("d", 1);
223 ///
224 /// // Obtain items from queue
225 /// while let Some((key, value)) = queue.take() {
226 /// println!("{key}: {value}");
227 /// }
228 /// ```
229 #[allow(clippy::missing_panics_doc)]
230 #[inline]
231 pub fn take(&mut self) -> Option<(K, V)> {
232 // Obtain the current instant once to select due items during iteration,
233 // or tight loops might experience slowdowns of up to a factor of 6
234 let deadline = Instant::now();
235 let opt = self.store.iter().next().and_then(|(key, item)| {
236 (item.deadline() <= deadline).then(|| key.clone())
237 });
238
239 // Remove and return the first item we found, which is the next item
240 // in current queue order that can be considered to be due
241 opt.map(|key| {
242 // We can safely use expect here, since we're iterating over a
243 // store that is synchronized with the ordering
244 self.remove(&key)
245 .map(|value| (key, value))
246 .expect("invariant")
247 })
248 }
249}
250
251// ----------------------------------------------------------------------------
252// Trait implementations
253// ----------------------------------------------------------------------------
254
255impl<K, V, S> Store<K, V> for Queue<K, V, S>
256where
257 K: Key,
258 S: Store<K, Item>,
259{
260 /// Returns a reference to the value identified by the key.
261 ///
262 /// # Examples
263 ///
264 /// ```
265 /// use zrx_store::{Queue, Store, StoreMut};
266 ///
267 /// // Create queue and initial state
268 /// let mut queue = Queue::default();
269 /// queue.insert("key", 42);
270 ///
271 /// // Obtain reference to value
272 /// let value = queue.get(&"key");
273 /// assert_eq!(value, Some(&42));
274 /// ```
275 #[inline]
276 fn get<Q>(&self, key: &Q) -> Option<&V>
277 where
278 K: Borrow<Q>,
279 Q: Key,
280 {
281 match self.store.get(key) {
282 Some(item) => self.items.get(*item.data()),
283 None => None,
284 }
285 }
286
287 /// Returns whether the queue contains the key.
288 ///
289 /// # Examples
290 ///
291 /// ```
292 /// use zrx_store::{Queue, Store, StoreMut};
293 ///
294 /// // Create queue and initial state
295 /// let mut queue = Queue::default();
296 /// queue.insert("key", 42);
297 ///
298 /// // Ensure presence of key
299 /// let check = queue.contains_key(&"key");
300 /// assert_eq!(check, true);
301 /// ```
302 #[inline]
303 fn contains_key<Q>(&self, key: &Q) -> bool
304 where
305 K: Borrow<Q>,
306 Q: Key,
307 {
308 self.store.contains_key(key)
309 }
310
311 /// Returns the number of items in the queue.
312 #[inline]
313 fn len(&self) -> usize {
314 self.store.len()
315 }
316}
317
318impl<K, V, S> StoreMut<K, V> for Queue<K, V, S>
319where
320 K: Key,
321 V: Value,
322 S: StoreMut<K, Item>,
323{
324 /// Inserts the value identified by the key.
325 ///
326 /// This method only updates the data of the [`Item`], but does not change
327 /// the values of [`Item::deadline`] in case the item already exists.
328 ///
329 /// # Examples
330 ///
331 /// ```
332 /// use zrx_store::{Queue, StoreMut};
333 ///
334 /// // Create queue
335 /// let mut queue = Queue::default();
336 ///
337 /// // Insert value
338 /// let value = queue.insert("key", 42);
339 /// assert_eq!(value, None);
340 /// ```
341 #[inline]
342 fn insert(&mut self, key: K, value: V) -> Option<V> {
343 if let Some(item) = self.store.get(&key) {
344 let prior = &mut self.items[*item.data()];
345 (prior != &value).then(|| mem::replace(prior, value))
346 } else {
347 let index = self.items.insert(value);
348 self.store.insert(key, Item::new(index));
349 None
350 }
351 }
352
353 /// Removes the value identified by the key.
354 ///
355 /// # Examples
356 ///
357 /// ```
358 /// use zrx_store::{Queue, StoreMut};
359 ///
360 /// // Create queue and initial state
361 /// let mut queue = Queue::default();
362 /// queue.insert("key", 42);
363 ///
364 /// // Remove and return value
365 /// let value = queue.remove(&"key");
366 /// assert_eq!(value, Some(42));
367 /// ```
368 #[inline]
369 fn remove<Q>(&mut self, key: &Q) -> Option<V>
370 where
371 K: Borrow<Q>,
372 Q: Key,
373 {
374 self.store
375 .remove(key)
376 .map(|item| self.items.remove(*item.data()))
377 }
378
379 /// Removes the value identified by the key and returns both.
380 ///
381 /// # Examples
382 ///
383 /// ```
384 /// use zrx_store::{Queue, StoreMut};
385 ///
386 /// // Create queue and initial state
387 /// let mut queue = Queue::default();
388 /// queue.insert("key", 42);
389 ///
390 /// // Remove and return entry
391 /// let entry = queue.remove_entry(&"key");
392 /// assert_eq!(entry, Some(("key", 42)));
393 /// ```
394 #[inline]
395 fn remove_entry<Q>(&mut self, key: &Q) -> Option<(K, V)>
396 where
397 K: Borrow<Q>,
398 Q: Key,
399 {
400 self.store
401 .remove_entry(key)
402 .map(|(key, item)| (key, self.items.remove(*item.data())))
403 }
404
405 /// Clears the queue, removing all items.
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// use zrx_store::{Queue, Store, StoreMut};
411 ///
412 /// // Create queue and initial state
413 /// let mut queue = Queue::default();
414 /// queue.insert("key", 42);
415 ///
416 /// // Clear queue
417 /// queue.clear();
418 /// assert!(queue.is_empty());
419 /// ```
420 #[inline]
421 fn clear(&mut self) {
422 self.store.clear();
423 self.items.clear();
424 }
425}
426
427impl<K, V, S> StoreMutRef<K, V> for Queue<K, V, S>
428where
429 K: Key,
430 S: StoreMut<K, Item>,
431{
432 /// Returns a mutable reference to the value identified by the key.
433 ///
434 /// # Examples
435 ///
436 /// ```
437 /// use zrx_store::{Queue, StoreMut, StoreMutRef};
438 ///
439 /// // Create queue and initial state
440 /// let mut queue = Queue::default();
441 /// queue.insert("key", 42);
442 ///
443 /// // Obtain mutable reference to value
444 /// let mut value = queue.get_mut(&"key");
445 /// assert_eq!(value, Some(&mut 42));
446 /// ```
447 #[inline]
448 fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
449 where
450 K: Borrow<Q>,
451 Q: Key,
452 {
453 match self.store.get(key) {
454 Some(item) => self.items.get_mut(*item.data()),
455 None => None,
456 }
457 }
458
459 /// Returns a mutable reference to the value or creates the default.
460 ///
461 /// # Examples
462 ///
463 /// ```
464 /// use zrx_store::{Queue, StoreMutRef};
465 ///
466 /// // Create queue
467 /// let mut queue = Queue::<_, i32>::default();
468 ///
469 /// // Obtain mutable reference to value
470 /// let value = queue.get_or_insert_default(&"key");
471 /// assert_eq!(value, &mut 0);
472 /// ```
473 #[inline]
474 fn get_or_insert_default(&mut self, key: &K) -> &mut V
475 where
476 V: Default,
477 {
478 if !self.store.contains_key(key) {
479 let index = self.items.insert(V::default());
480 self.store.insert(key.clone(), Item::new(index));
481 }
482
483 // We can safely use expect here, as the key is present
484 self.get_mut(key).expect("invariant")
485 }
486}
487
488// ----------------------------------------------------------------------------
489
490#[allow(clippy::into_iter_without_iter)]
491impl<'a, K, V, S> IntoIterator for &'a Queue<K, V, S>
492where
493 K: Key,
494 V: Value,
495 S: StoreIterable<K, Item>,
496{
497 type Item = (&'a K, &'a V);
498 type IntoIter = Iter<'a, K, V>;
499
500 /// Creates an iterator over the queue.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// use zrx_store::{Queue, StoreMut};
506 ///
507 /// // Create queue and initial state
508 /// let mut queue = Queue::default();
509 /// queue.insert("key", 42);
510 ///
511 /// // Create iterator over the queue
512 /// for (key, value) in &queue {
513 /// println!("{key}: {value}");
514 /// }
515 /// ```
516 #[inline]
517 fn into_iter(self) -> Self::IntoIter {
518 self.iter()
519 }
520}
521
522#[allow(clippy::into_iter_without_iter)]
523impl<'a, K, V, S> IntoIterator for &'a mut Queue<K, V, S>
524where
525 K: Key,
526 V: Value,
527 S: StoreMut<K, Item> + StoreIterable<K, Item>,
528{
529 type Item = (&'a K, &'a mut V);
530 type IntoIter = IterMut<'a, K, V>;
531
532 /// Creates a mutable iterator over the queue.
533 ///
534 /// # Examples
535 ///
536 /// ```
537 /// use zrx_store::{Queue, StoreMut};
538 ///
539 /// // Create queue and initial state
540 /// let mut queue = Queue::default();
541 /// queue.insert("key", 42);
542 ///
543 /// // Create iterator over the queue
544 /// for (key, value) in &mut queue {
545 /// println!("{key}: {value}");
546 /// }
547 /// ```
548 #[inline]
549 fn into_iter(self) -> Self::IntoIter {
550 self.iter_mut()
551 }
552}
553
554// ----------------------------------------------------------------------------
555
556impl<K, V> Default for Queue<K, V>
557where
558 K: Key,
559{
560 /// Creates a queue with [`HashMap::default`][] as a store.
561 ///
562 /// Note that this method does not allow to customize the [`BuildHasher`][],
563 /// but uses [`ahash`] by default, which is the fastest known hasher.
564 ///
565 /// [`BuildHasher`]: std::hash::BuildHasher
566 /// [`HashMap::default`]: Default::default
567 ///
568 /// # Examples
569 ///
570 /// ```
571 /// use zrx_store::{Queue, StoreMut};
572 ///
573 /// // Create queue
574 /// let mut queue = Queue::default();
575 ///
576 /// // Insert value
577 /// queue.insert("key", 42);
578 /// ```
579 #[inline]
580 fn default() -> Self {
581 Self::new()
582 }
583}
584
585// ----------------------------------------------------------------------------
586
587impl<K, V, S> Debug for Queue<K, V, S>
588where
589 K: Debug,
590 V: Debug,
591 S: Debug,
592{
593 /// Formats the queue for debugging.
594 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
595 f.debug_struct("Queue")
596 .field("store", &self.store)
597 .field("items", &self.items)
598 .finish()
599 }
600}