Skip to main content

throttle_net/
queue.rs

1//! A bounded, deadline-aware, priority queue in front of a limiter.
2//!
3//! When a limiter is saturated, callers can either be rejected or *wait*. A
4//! [`Queue`] lets them wait in an orderly way: it admits up to a fixed number of
5//! waiters, serves them by priority (and fairly across keys at equal priority),
6//! and **drops a waiter whose deadline has passed rather than serving it**. When
7//! the queue is full, an [`Overflow`] policy decides who is turned away.
8//!
9//! The queue requires an async runtime (`tokio` feature). Acquisition is a single
10//! token from the wrapped limiter per call.
11//!
12//! ## Scheduling
13//!
14//! At any moment the eligible waiter with the highest priority holds the turn; it
15//! is the one that draws from the limiter, so lower-priority waiters never jump
16//! ahead. Among equal priorities the least-recently-served key goes next (fair
17//! across keys), and within a key it is first-come-first-served. Expired waiters
18//! are skipped when choosing who to serve, so a dead waiter never blocks a live
19//! one.
20
21use core::hash::Hash;
22use core::time::Duration;
23use std::collections::HashMap;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
26
27use clock_lib::{Clock, Monotonic, SystemClock};
28use tokio::sync::Notify;
29
30use crate::decision::Decision;
31use crate::error::ThrottleError;
32use crate::limiter::Limiter;
33
34/// What a full queue does with a new request.
35///
36/// `#[non_exhaustive]`: more policies may be added.
37#[non_exhaustive]
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
39pub enum Overflow {
40    /// Turn the new request away with [`ThrottleError::QueueFull`].
41    #[default]
42    Reject,
43    /// Evict the oldest waiter to make room for the new one.
44    DropOldest,
45    /// Evict the lowest-priority waiter to make room — unless the new request is
46    /// itself the lowest, in which case it is rejected.
47    DropLowestPriority,
48}
49
50/// One queued waiter's bookkeeping.
51struct Waiter<K> {
52    /// Enqueue order; the FIFO tie-break and the "oldest" key.
53    seq: u64,
54    /// Higher is served first.
55    priority: u32,
56    /// Absolute deadline in store-epoch milliseconds, or `None` for no deadline.
57    deadline_ms: Option<u64>,
58    /// The fairness key.
59    key: K,
60    /// Set by the scheduler when this waiter is evicted by an overflow policy, so
61    /// its own task can return [`ThrottleError::QueueFull`].
62    evicted: Arc<AtomicBool>,
63}
64
65/// The mutable scheduler state, guarded by one mutex.
66struct State<K> {
67    waiters: HashMap<u64, Waiter<K>>,
68    /// Service order counter; also the "recency" stamp written to `last_served`.
69    service_seq: u64,
70    /// Enqueue counter handing out waiter ids.
71    next_seq: u64,
72    /// Per-key last service stamp, for fair-across-keys tie-breaking.
73    last_served: HashMap<K, u64>,
74}
75
76impl<K: Eq + Hash + Clone> State<K> {
77    fn new() -> Self {
78        Self {
79            waiters: HashMap::new(),
80            service_seq: 0,
81            next_seq: 0,
82            last_served: HashMap::new(),
83        }
84    }
85
86    /// Removes waiters whose deadline has already passed; their own tasks return
87    /// [`ThrottleError::DeadlineExceeded`] when they next check.
88    fn prune_expired(&mut self, now_ms: u64) {
89        self.waiters
90            .retain(|_, w| w.deadline_ms.is_none_or(|d| now_ms < d));
91    }
92
93    /// The id of the waiter that should be served next, skipping expired ones:
94    /// highest priority, then least-recently-served key, then lowest seq.
95    fn winner(&self, now_ms: u64) -> Option<u64> {
96        self.waiters
97            .iter()
98            .filter(|(_, w)| w.deadline_ms.is_none_or(|d| now_ms < d))
99            .min_by(|(_, a), (_, b)| {
100                b.priority
101                    .cmp(&a.priority) // higher priority first
102                    .then_with(|| self.recency(&a.key).cmp(&self.recency(&b.key)))
103                    .then_with(|| a.seq.cmp(&b.seq))
104            })
105            .map(|(&id, _)| id)
106    }
107
108    /// The last-served stamp for a key (never served sorts first).
109    fn recency(&self, key: &K) -> u64 {
110        self.last_served.get(key).copied().unwrap_or(0)
111    }
112
113    /// Marks `id` served: stamps its key's recency and removes it.
114    fn serve(&mut self, id: u64) {
115        if let Some(w) = self.waiters.remove(&id) {
116            self.service_seq += 1;
117            let _ = self.last_served.insert(w.key, self.service_seq);
118        }
119    }
120
121    /// Inserts a new waiter, returning its id and its eviction flag.
122    fn insert(
123        &mut self,
124        priority: u32,
125        deadline_ms: Option<u64>,
126        key: K,
127    ) -> (u64, Arc<AtomicBool>) {
128        let id = self.next_seq;
129        self.next_seq += 1;
130        let evicted = Arc::new(AtomicBool::new(false));
131        let _ = self.waiters.insert(
132            id,
133            Waiter {
134                seq: id,
135                priority,
136                deadline_ms,
137                key,
138                evicted: Arc::clone(&evicted),
139            },
140        );
141        (id, evicted)
142    }
143
144    /// The id of the oldest waiter (smallest seq), for [`Overflow::DropOldest`].
145    fn oldest(&self) -> Option<u64> {
146        self.waiters
147            .iter()
148            .min_by_key(|(_, w)| w.seq)
149            .map(|(&id, _)| id)
150    }
151
152    /// The id and priority of the weakest waiter (lowest priority, newest first),
153    /// for [`Overflow::DropLowestPriority`].
154    fn weakest(&self) -> Option<(u64, u32)> {
155        self.waiters
156            .iter()
157            .min_by(|(_, a), (_, b)| a.priority.cmp(&b.priority).then_with(|| b.seq.cmp(&a.seq)))
158            .map(|(&id, w)| (id, w.priority))
159    }
160}
161
162/// A bounded, deadline-aware, priority queue fronting a limiter `L`, keyed by `K`
163/// for fairness and timed by clock `C`.
164///
165/// Build one with [`Queue::builder`]. Use `K = ()` for a plain priority queue
166/// with no cross-key fairness.
167///
168/// # Examples
169///
170/// ```
171/// # async fn run() -> Result<(), throttle_net::ThrottleError> {
172/// use std::time::Duration;
173/// use throttle_net::{Overflow, Queue, Throttle};
174///
175/// // 50 req/s, with room for 100 waiters; reject when full.
176/// let queue: Queue<Throttle, &str> = Queue::builder()
177///     .capacity(100)
178///     .overflow(Overflow::DropOldest)
179///     .build(Throttle::per_second(50));
180///
181/// // Wait for a slot, but give up after 2 seconds.
182/// queue
183///     .acquire("tenant:1", 0, Some(Duration::from_secs(2)))
184///     .await?;
185/// # Ok(())
186/// # }
187/// ```
188pub struct Queue<L, K = (), C = SystemClock>
189where
190    K: Eq + Hash + Clone + Send + Sync,
191    C: Clock,
192{
193    inner: L,
194    state: Mutex<State<K>>,
195    notify: Notify,
196    capacity: usize,
197    overflow: Overflow,
198    clock: C,
199    epoch: Monotonic,
200}
201
202// Anchored on a concrete, limiter- and key-free type so `Queue::builder()` needs
203// no type annotation; `L` and `K` are fixed later by [`QueueBuilder::build`].
204impl Queue<core::convert::Infallible, ()> {
205    /// Starts building a queue.
206    #[must_use]
207    pub fn builder() -> QueueBuilder {
208        QueueBuilder::new()
209    }
210}
211
212impl<L, K, C> Queue<L, K, C>
213where
214    L: Limiter,
215    K: Eq + Hash + Clone + Send + Sync,
216    C: Clock + Clone,
217{
218    fn new(inner: L, capacity: usize, overflow: Overflow, clock: C) -> Self {
219        let epoch = clock.now();
220        Self {
221            inner,
222            state: Mutex::new(State::new()),
223            notify: Notify::new(),
224            capacity: capacity.max(1),
225            overflow,
226            clock,
227            epoch,
228        }
229    }
230
231    /// Replaces the time source (the deadline clock), for deterministic tests.
232    /// The queue is rebuilt empty around the new clock.
233    #[must_use]
234    pub fn with_clock<C2>(self, clock: C2) -> Queue<L, K, C2>
235    where
236        C2: Clock + Clone,
237    {
238        Queue::new(self.inner, self.capacity, self.overflow, clock)
239    }
240
241    /// The number of waiters currently enqueued (a momentary snapshot).
242    #[must_use]
243    pub fn len(&self) -> usize {
244        self.lock().waiters.len()
245    }
246
247    /// Returns `true` if no waiters are enqueued.
248    #[must_use]
249    pub fn is_empty(&self) -> bool {
250        self.lock().waiters.is_empty()
251    }
252
253    /// The configured waiter capacity.
254    #[must_use]
255    pub fn capacity(&self) -> usize {
256        self.capacity
257    }
258
259    /// A shared reference to the wrapped limiter.
260    pub fn inner(&self) -> &L {
261        &self.inner
262    }
263
264    #[inline]
265    fn lock(&self) -> MutexGuard<'_, State<K>> {
266        self.state.lock().unwrap_or_else(PoisonError::into_inner)
267    }
268
269    #[inline]
270    fn now_ms(&self) -> u64 {
271        let elapsed = self.clock.now().saturating_duration_since(self.epoch);
272        u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
273    }
274
275    /// Inserts a waiter, applying the overflow policy if the queue is full.
276    ///
277    /// On success (and whenever a waiter is evicted) the peers are woken so the
278    /// next-in-line re-evaluates its turn, an evicted waiter learns it was dropped,
279    /// and a higher-priority newcomer can preempt a sleeping lower-priority one.
280    fn register(
281        &self,
282        now_ms: u64,
283        priority: u32,
284        deadline_ms: Option<u64>,
285        key: &K,
286    ) -> Result<(u64, Arc<AtomicBool>), ThrottleError> {
287        let mut did_evict = false;
288        let outcome = {
289            let mut state = self.lock();
290            state.prune_expired(now_ms);
291
292            if state.waiters.len() < self.capacity {
293                Ok(state.insert(priority, deadline_ms, key.clone()))
294            } else {
295                match self.overflow {
296                    Overflow::Reject => Err(ThrottleError::QueueFull),
297                    Overflow::DropOldest => match state.oldest() {
298                        Some(victim) => {
299                            evict(&mut state, victim);
300                            did_evict = true;
301                            Ok(state.insert(priority, deadline_ms, key.clone()))
302                        }
303                        None => Err(ThrottleError::QueueFull),
304                    },
305                    Overflow::DropLowestPriority => match state.weakest() {
306                        // Evict only if the newcomer outranks the weakest resident.
307                        Some((victim, weakest)) if priority > weakest => {
308                            evict(&mut state, victim);
309                            did_evict = true;
310                            Ok(state.insert(priority, deadline_ms, key.clone()))
311                        }
312                        _ => Err(ThrottleError::QueueFull),
313                    },
314                }
315            }
316        };
317
318        if did_evict || outcome.is_ok() {
319            self.notify.notify_waiters();
320        }
321        outcome
322    }
323
324    /// Acquires one token, waiting in the queue until served, the deadline
325    /// passes, or the overflow policy turns the request away.
326    ///
327    /// `priority` orders waiters (higher first). `key` is the fairness key —
328    /// equal-priority waiters are served round-robin across keys. `deadline` is a
329    /// wait budget; `None` waits indefinitely.
330    ///
331    /// # Errors
332    ///
333    /// - [`ThrottleError::QueueFull`] when the queue is full and the policy
334    ///   rejects (or evicts) this request.
335    /// - [`ThrottleError::DeadlineExceeded`] when the deadline passes first.
336    /// - [`ThrottleError::CostExceedsCapacity`] when the wrapped limiter can
337    ///   never grant a single unit.
338    pub async fn acquire(
339        &self,
340        key: K,
341        priority: u32,
342        deadline: Option<Duration>,
343    ) -> Result<(), ThrottleError> {
344        let start_ms = self.now_ms();
345        let deadline_ms = deadline
346            .map(|d| start_ms.saturating_add(u64::try_from(d.as_millis()).unwrap_or(u64::MAX)));
347
348        let (id, evicted) = self.register(start_ms, priority, deadline_ms, &key)?;
349        // Ensure the waiter is removed and peers are woken on any exit path.
350        let _guard = LeaveGuard { queue: self, id };
351
352        loop {
353            // Register interest before checking, so a wake between the check and
354            // the await is not lost.
355            let notified = self.notify.notified();
356            tokio::pin!(notified);
357            // Register interest now; ignore whether a notification was already
358            // pending (the loop re-checks the condition regardless).
359            let _ = notified.as_mut().enable();
360
361            if evicted.load(Ordering::Acquire) {
362                return Err(ThrottleError::QueueFull);
363            }
364
365            let now_ms = self.now_ms();
366            if deadline_ms.is_some_and(|d| now_ms >= d) {
367                return Err(ThrottleError::DeadlineExceeded);
368            }
369
370            let wait = {
371                let mut state = self.lock();
372                if state.winner(now_ms) == Some(id) {
373                    match self.inner.acquire_cost(1) {
374                        Decision::Acquired => {
375                            state.serve(id);
376                            drop(state);
377                            self.notify.notify_waiters();
378                            return Ok(());
379                        }
380                        Decision::Impossible => {
381                            return Err(ThrottleError::CostExceedsCapacity {
382                                cost: 1,
383                                capacity: self.inner.capacity(),
384                            });
385                        }
386                        // The turn is ours but no token yet; wait for the refill.
387                        Decision::Retry { after } => after,
388                    }
389                } else {
390                    // Not our turn; wait to be promoted (or until our deadline).
391                    Duration::from_secs(3600)
392                }
393            };
394
395            let sleep_for = cap_to_deadline(wait, now_ms, deadline_ms);
396            tokio::select! {
397                () = notified.as_mut() => {}
398                () = tokio::time::sleep(sleep_for) => {}
399            }
400        }
401    }
402}
403
404/// Caps a wait so it never sleeps past the waiter's own deadline.
405fn cap_to_deadline(wait: Duration, now_ms: u64, deadline_ms: Option<u64>) -> Duration {
406    match deadline_ms {
407        Some(d) => wait.min(Duration::from_millis(d.saturating_sub(now_ms))),
408        None => wait,
409    }
410}
411
412/// Evicts a waiter on behalf of an overflow policy.
413fn evict<K: Eq + Hash + Clone>(state: &mut State<K>, id: u64) {
414    if let Some(w) = state.waiters.remove(&id) {
415        w.evicted.store(true, Ordering::Release);
416    }
417}
418
419/// Removes a waiter and wakes its peers when its task leaves the queue.
420struct LeaveGuard<'a, L, K, C>
421where
422    L: Limiter,
423    K: Eq + Hash + Clone + Send + Sync,
424    C: Clock + Clone,
425{
426    queue: &'a Queue<L, K, C>,
427    id: u64,
428}
429
430impl<L, K, C> Drop for LeaveGuard<'_, L, K, C>
431where
432    L: Limiter,
433    K: Eq + Hash + Clone + Send + Sync,
434    C: Clock + Clone,
435{
436    fn drop(&mut self) {
437        {
438            let mut state = self.queue.lock();
439            let _ = state.waiters.remove(&self.id);
440        }
441        // Wake peers so the next-in-line re-evaluates its turn.
442        self.queue.notify.notify_waiters();
443    }
444}
445
446/// Builder for a [`Queue`].
447#[derive(Debug, Clone, Copy)]
448pub struct QueueBuilder {
449    capacity: usize,
450    overflow: Overflow,
451}
452
453impl Default for QueueBuilder {
454    fn default() -> Self {
455        Self::new()
456    }
457}
458
459impl QueueBuilder {
460    /// Creates a builder with a default capacity of 1024 and [`Overflow::Reject`].
461    #[must_use]
462    pub fn new() -> Self {
463        Self {
464            capacity: 1024,
465            overflow: Overflow::Reject,
466        }
467    }
468
469    /// Sets the maximum number of simultaneous waiters (clamped to at least one).
470    #[must_use]
471    pub fn capacity(mut self, capacity: usize) -> Self {
472        self.capacity = capacity.max(1);
473        self
474    }
475
476    /// Sets the policy applied when the queue is full.
477    #[must_use]
478    pub fn overflow(mut self, overflow: Overflow) -> Self {
479        self.overflow = overflow;
480        self
481    }
482
483    /// Wraps `limiter`, producing a queue driven by the system clock.
484    #[must_use]
485    pub fn build<L, K>(self, limiter: L) -> Queue<L, K, SystemClock>
486    where
487        L: Limiter,
488        K: Eq + Hash + Clone + Send + Sync,
489    {
490        Queue::new(limiter, self.capacity, self.overflow, SystemClock::new())
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    #![allow(clippy::unwrap_used)]
497
498    use super::{Overflow, Queue};
499    use crate::throttle::Throttle;
500    use core::time::Duration;
501    use std::sync::Arc;
502
503    fn assert_send_sync<T: Send + Sync>() {}
504
505    #[test]
506    fn test_queue_is_send_sync() {
507        assert_send_sync::<Queue<Throttle, &'static str>>();
508    }
509
510    #[tokio::test]
511    async fn test_immediate_acquire_when_token_is_free() {
512        let queue: Queue<Throttle, ()> = Queue::builder().build(Throttle::per_second(10));
513        assert!(queue.acquire((), 0, None).await.is_ok());
514        assert!(queue.is_empty());
515    }
516
517    #[tokio::test]
518    async fn test_cost_exceeds_capacity_is_reported() {
519        let queue: Queue<Throttle, ()> = Queue::builder().build(Throttle::per_second(0));
520        let err = queue.acquire((), 0, Some(Duration::from_secs(1))).await;
521        assert!(matches!(
522            err,
523            Err(crate::ThrottleError::CostExceedsCapacity { .. })
524        ));
525    }
526
527    #[tokio::test]
528    async fn test_deadline_exceeded_when_no_token_arrives() {
529        // A drained 1/hour limiter won't refill within the short deadline, so the
530        // waiter is dropped with DeadlineExceeded. Real time, small deadline.
531        let queue: Queue<Throttle, ()> =
532            Queue::builder().build(Throttle::per_duration(1, Duration::from_secs(3600)));
533        assert!(queue.acquire((), 0, None).await.is_ok()); // takes the only token
534
535        let err = queue.acquire((), 0, Some(Duration::from_millis(30))).await;
536        assert!(matches!(err, Err(crate::ThrottleError::DeadlineExceeded)));
537        assert!(queue.is_empty(), "the expired waiter is removed");
538    }
539
540    #[tokio::test]
541    async fn test_reject_overflow_when_full() {
542        // Capacity 1; the first waiter occupies it (parked on a drained limiter),
543        // the second is rejected immediately.
544        let queue: Arc<Queue<Throttle, ()>> = Arc::new(
545            Queue::builder()
546                .capacity(1)
547                .overflow(Overflow::Reject)
548                .build(Throttle::per_duration(1, Duration::from_secs(3600))),
549        );
550        assert!(queue.acquire((), 0, None).await.is_ok()); // consumes the token
551
552        let q = Arc::clone(&queue);
553        let parked = tokio::spawn(async move { q.acquire((), 0, None).await });
554        while queue.is_empty() {
555            tokio::task::yield_now().await;
556        }
557        let rejected = queue.acquire((), 0, Some(Duration::from_secs(1))).await;
558        assert!(matches!(rejected, Err(crate::ThrottleError::QueueFull)));
559        parked.abort();
560    }
561
562    #[tokio::test]
563    async fn test_drop_oldest_overflow_evicts_the_first_waiter() {
564        let queue: Arc<Queue<Throttle, ()>> = Arc::new(
565            Queue::builder()
566                .capacity(1)
567                .overflow(Overflow::DropOldest)
568                .build(Throttle::per_duration(1, Duration::from_secs(3600))),
569        );
570        assert!(queue.acquire((), 0, None).await.is_ok()); // drain the token
571
572        // First waiter parks, occupying the single slot.
573        let q = Arc::clone(&queue);
574        let first = tokio::spawn(async move { q.acquire((), 0, None).await });
575        while queue.is_empty() {
576            tokio::task::yield_now().await;
577        }
578        // Second waiter evicts the first; the first returns QueueFull.
579        let q = Arc::clone(&queue);
580        let second = tokio::spawn(async move { q.acquire((), 0, None).await });
581        let first_result = first.await.unwrap();
582        assert!(matches!(first_result, Err(crate::ThrottleError::QueueFull)));
583        second.abort();
584    }
585
586    #[tokio::test]
587    async fn test_priority_is_served_high_first() {
588        use std::sync::atomic::{AtomicU32, Ordering};
589
590        // One token every 50ms — a wide margin over the microseconds it takes the
591        // three waiters to register, so all are parked before the first refill and
592        // the served order is determined purely by priority, not by timing.
593        let queue: Arc<Queue<Throttle, ()>> = Arc::new(
594            Queue::builder()
595                .capacity(10)
596                .build(Throttle::per_duration(1, Duration::from_millis(50))),
597        );
598        assert!(queue.acquire((), 0, None).await.is_ok()); // drain the one token
599
600        let order = Arc::new(std::sync::Mutex::new(Vec::new()));
601        let started = Arc::new(AtomicU32::new(0));
602
603        let mut handles = Vec::new();
604        for priority in [1u32, 5, 3] {
605            let q = Arc::clone(&queue);
606            let order = Arc::clone(&order);
607            let started = Arc::clone(&started);
608            handles.push(tokio::spawn(async move {
609                let _ = started.fetch_add(1, Ordering::Relaxed);
610                q.acquire((), priority, None).await.unwrap();
611                order.lock().unwrap().push(priority);
612            }));
613        }
614        // Ensure all three have registered before tokens start flowing.
615        while queue.len() < 3 {
616            tokio::task::yield_now().await;
617        }
618        for h in handles {
619            h.await.unwrap();
620        }
621
622        assert_eq!(*order.lock().unwrap(), vec![5, 3, 1]);
623    }
624
625    #[test]
626    fn test_fair_winner_rotates_across_keys_at_equal_priority() {
627        use super::{State, Waiter};
628        use std::sync::atomic::AtomicBool;
629
630        fn enqueue(state: &mut State<&'static str>, id: u64, priority: u32, key: &'static str) {
631            let _ = state.waiters.insert(
632                id,
633                Waiter {
634                    seq: id,
635                    priority,
636                    deadline_ms: None,
637                    key,
638                    evicted: Arc::new(AtomicBool::new(false)),
639                },
640            );
641        }
642
643        let mut state = State::<&'static str>::new();
644        // Two for key "a", one for key "b", all equal priority.
645        enqueue(&mut state, 0, 0, "a");
646        enqueue(&mut state, 1, 0, "a");
647        enqueue(&mut state, 2, 0, "b");
648
649        // No key served yet: tie broken by seq, so the first "a" goes.
650        assert_eq!(state.winner(0), Some(0));
651        state.serve(0);
652        // Key "a" was just served, so the least-recently-served key "b" goes next
653        // even though another "a" is older by seq — fair across keys.
654        assert_eq!(state.winner(0), Some(2));
655        state.serve(2);
656        // Only the second "a" remains.
657        assert_eq!(state.winner(0), Some(1));
658    }
659
660    #[test]
661    fn test_priority_beats_fairness_in_winner_selection() {
662        use super::{State, Waiter};
663        use std::sync::atomic::AtomicBool;
664
665        let mut state = State::<&'static str>::new();
666        let _ = state.waiters.insert(
667            0,
668            Waiter {
669                seq: 0,
670                priority: 1,
671                deadline_ms: None,
672                key: "a",
673                evicted: Arc::new(AtomicBool::new(false)),
674            },
675        );
676        let _ = state.waiters.insert(
677            1,
678            Waiter {
679                seq: 1,
680                priority: 9,
681                deadline_ms: None,
682                key: "b",
683                evicted: Arc::new(AtomicBool::new(false)),
684            },
685        );
686        // Higher priority wins regardless of key recency or seq.
687        assert_eq!(state.winner(0), Some(1));
688    }
689
690    #[test]
691    fn test_winner_skips_expired_waiters() {
692        use super::{State, Waiter};
693        use std::sync::atomic::AtomicBool;
694
695        let mut state = State::<&'static str>::new();
696        let _ = state.waiters.insert(
697            0,
698            Waiter {
699                seq: 0,
700                priority: 9,
701                deadline_ms: Some(100),
702                key: "a",
703                evicted: Arc::new(AtomicBool::new(false)),
704            },
705        );
706        let _ = state.waiters.insert(
707            1,
708            Waiter {
709                seq: 1,
710                priority: 1,
711                deadline_ms: None,
712                key: "b",
713                evicted: Arc::new(AtomicBool::new(false)),
714            },
715        );
716        // At t=200 the high-priority waiter has expired, so the live one wins.
717        assert_eq!(state.winner(200), Some(1));
718    }
719}