Skip to main content

rivet/
gpu_pool.rs

1//! Process-wide GPU reservation pool.
2//!
3//! Each detected GPU is a slot. Callers `claim()` an available slot
4//! and hold the returned `GpuLease` for the duration of their work;
5//! `Drop` releases the slot back to the pool. The lease's
6//! `gpu_index` field is the device index the work should run on.
7//!
8//! Concurrency model: one variant per GPU at any time. With N GPUs
9//! and M waiters, the first N waiters get leases immediately and the
10//! remaining M−N park on the semaphore until a lease drops. This is
11//! the deliberate design decision from 2026-05-02 — concurrent
12//! NVENC sessions on the same CUDA context deadlocked at session
13//! ~5/5 init, GPU went idle, no frames encoded. One-encoder-per-GPU
14//! is the load-bearing invariant; the pool's role is to enforce it
15//! while still letting variants run in parallel ACROSS GPUs.
16//!
17//! CPU-only hosts (no GPUs detected): `claim()` returns `None`
18//! immediately — callers fall back to CPU encode without queuing.
19
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22
23use codec::gpu::{GpuDevice, GpuVendor};
24use tokio::sync::{OwnedSemaphorePermit, Semaphore};
25
26pub struct GpuPool {
27    /// Per-slot GPU device index (`GpuDevice.index`, not vec position
28    /// — accommodates sparse `CUDA_VISIBLE_DEVICES` setups).
29    gpu_indices: Vec<u32>,
30    /// Per-slot vendor — load-bearing for the encoder factory's
31    /// vendor-aware dispatch. Without it, multi-vendor hosts (NVIDIA
32    /// + Intel Arc) ALWAYS picked NVENC because the factory tries
33    /// NVIDIA first and both vendors expose index 0; the Arc sat
34    /// idle even when the NVIDIA card was busy.
35    gpu_vendors: Vec<GpuVendor>,
36    /// Per-slot human-readable device name. Used by `snapshot_leases`
37    /// (Phase 2 worker_load reporting) so the backend's admin view
38    /// can label each GPU lease badge with the same string the hello
39    /// frame already advertised. Stays in lockstep with the hello
40    /// frame's `WsGpuInfo.name`.
41    gpu_names: Vec<String>,
42    /// Per-slot free flag. `true` = available; `false` = leased.
43    /// Atomic so the CAS-find-free-slot path under `claim()` is
44    /// lock-free; correctness is enforced by the semaphore counting
45    /// (see `claim`).
46    free: Arc<Vec<AtomicBool>>,
47    /// Semaphore with N permits (= number of GPUs). Acquiring a
48    /// permit guarantees at least one `free` slot exists, so the
49    /// CAS loop in `claim()` always succeeds without retry.
50    permits: Arc<Semaphore>,
51    /// Count of variant tasks currently blocked inside `claim()`'s
52    /// `acquire_owned().await`. Used by the LeaseArbiter (planned
53    /// 2026-05-10) to decide whether to dispatch a helper task: if
54    /// any variant is already waiting for a permit, that variant
55    /// must claim before the arbiter steals a permit for a helper.
56    /// Incremented immediately before `acquire_owned().await`;
57    /// decremented as soon as the await returns (success or
58    /// cancellation) via the `PendingClaimGuard` RAII helper.
59    ///
60    /// `try_claim()` does NOT touch this — helpers are not blocked
61    /// claimers in the spare-capacity sense.
62    pending_claimers: Arc<AtomicUsize>,
63}
64
65/// RAII guard that increments `pending_claimers` on construction
66/// and decrements on drop. Used inside `claim()` to bracket the
67/// `acquire_owned().await` so the counter stays accurate even when
68/// the awaiting task is cancelled mid-await (the future is dropped,
69/// guard drops, counter decrements).
70struct PendingClaimGuard {
71    counter: Arc<AtomicUsize>,
72}
73
74impl PendingClaimGuard {
75    fn new(counter: Arc<AtomicUsize>) -> Self {
76        counter.fetch_add(1, Ordering::AcqRel);
77        Self { counter }
78    }
79}
80
81impl Drop for PendingClaimGuard {
82    fn drop(&mut self) {
83        self.counter.fetch_sub(1, Ordering::AcqRel);
84    }
85}
86
87/// Snapshot of one GPU slot's lease state at a moment in time.
88/// Returned by [`GpuPool::snapshot_leases`] for Phase 2 worker_load
89/// reporting. Field shape matches `queue::WsGpuLeaseEntry` so the
90/// caller can map across without a wire-format-aware translation.
91#[derive(Debug, Clone)]
92pub struct GpuLeaseEntry {
93    pub vendor: GpuVendor,
94    pub name: String,
95    pub index: u32,
96    pub leased: bool,
97}
98
99/// RAII guard returned by `GpuPool::claim`. The slot is released
100/// (and the underlying semaphore permit dropped) when this value
101/// is dropped — typically at the end of the variant's encode task.
102pub struct GpuLease {
103    pub gpu_index: u32,
104    pub vendor: GpuVendor,
105    slot_idx: usize,
106    free: Arc<Vec<AtomicBool>>,
107    _permit: OwnedSemaphorePermit,
108}
109
110impl Drop for GpuLease {
111    fn drop(&mut self) {
112        self.free[self.slot_idx].store(true, Ordering::Release);
113    }
114}
115
116impl GpuPool {
117    /// Build a pool from the host's detected GPU inventory. An empty
118    /// inventory is permitted — the resulting pool always returns
119    /// `None` from `claim()` so CPU-only hosts work without
120    /// special-casing at the call site.
121    pub fn new(devices: &[GpuDevice]) -> Self {
122        let n = devices.len();
123        Self {
124            gpu_indices: devices.iter().map(|d| d.index).collect(),
125            gpu_vendors: devices.iter().map(|d| d.vendor).collect(),
126            gpu_names: devices.iter().map(|d| d.name.clone()).collect(),
127            free: Arc::new((0..n).map(|_| AtomicBool::new(true)).collect()),
128            // Semaphore::new(0) is valid but `acquire` would deadlock.
129            // We never acquire on the empty path because `claim()`
130            // returns `None` early on CPU-only hosts.
131            permits: Arc::new(Semaphore::new(n)),
132            pending_claimers: Arc::new(AtomicUsize::new(0)),
133        }
134    }
135
136    /// How many variant tasks are currently parked inside `claim()`
137    /// waiting for a permit. The LeaseArbiter consults this to decide
138    /// whether to dispatch a helper: when `pending_claimers() > 0`,
139    /// at least one variant task wants a GPU and the arbiter must
140    /// step back so the variant claims first (FIFO fairness).
141    ///
142    /// Reads with `Ordering::Acquire`. The result is momentary — by
143    /// the time the caller observes it, a claim may have resolved or
144    /// a new one parked. That's expected; the arbiter re-checks
145    /// before each dispatch decision.
146    pub fn pending_claimers(&self) -> usize {
147        self.pending_claimers.load(Ordering::Acquire)
148    }
149
150    /// How many GPUs this pool manages. Useful for pre-spawning
151    /// variants when fewer variants exist than GPUs (no point
152    /// over-claiming).
153    pub fn capacity(&self) -> usize {
154        self.gpu_indices.len()
155    }
156
157    /// Snapshot per-GPU lease state. Result preserves slot order
158    /// (matches the order [`GpuPool::new`] saw devices), so callers
159    /// stitching the result against the hello frame's `gpu_pool`
160    /// see consistent indices across both reports.
161    ///
162    /// Reads `free` slots with `Ordering::Acquire`. The result is a
163    /// momentary snapshot — by the time the caller observes it, a
164    /// claim or drop may have flipped any slot. That's expected;
165    /// load reporting is best-effort observability, not a
166    /// transactional view.
167    ///
168    /// Used by the worker's Phase 2 (2026-05-07) load-tick task to
169    /// build the `worker_load` frame's `gpu_pool` field.
170    pub fn snapshot_leases(&self) -> Vec<GpuLeaseEntry> {
171        self.gpu_indices
172            .iter()
173            .zip(self.gpu_vendors.iter())
174            .zip(self.gpu_names.iter())
175            .enumerate()
176            .map(|(slot_idx, ((index, vendor), name))| GpuLeaseEntry {
177                vendor: *vendor,
178                name: name.clone(),
179                index: *index,
180                leased: !self.free[slot_idx].load(Ordering::Acquire),
181            })
182            .collect()
183    }
184
185    /// Claim an available GPU. Awaits if every GPU is currently
186    /// leased. Returns `None` immediately on CPU-only hosts — the
187    /// caller should fall back to CPU encode.
188    pub async fn claim(self: &Arc<Self>) -> Option<GpuLease> {
189        if self.gpu_indices.is_empty() {
190            return None;
191        }
192        // Track "blocked waiting for a permit" for the LeaseArbiter's
193        // fairness check. Guard is scoped to the await: on success the
194        // guard drops at end-of-block (decrement); on cancellation the
195        // future is dropped mid-await, the guard drops, and the
196        // counter still decrements. Either way the count stays
197        // accurate.
198        let permit = {
199            let _pending = PendingClaimGuard::new(Arc::clone(&self.pending_claimers));
200            Arc::clone(&self.permits)
201                .acquire_owned()
202                .await
203                .expect("GpuPool semaphore should never be closed")
204        };
205        // The permit guarantees ≥1 free slot. None here means the
206        // semaphore count and free-flag count drifted apart — a bug
207        // (RAII Drop bypassed, wrong atomic ordering, etc.).
208        match self.assign_free_slot(permit) {
209            Some(lease) => Some(lease),
210            None => unreachable!(
211                "GpuPool: permit acquired but no free slot found — \
212                 semaphore count and free-flag count drifted apart"
213            ),
214        }
215    }
216
217    /// Try to claim a GPU without blocking. Returns `None` if every
218    /// GPU is currently leased OR if the host has no GPUs.
219    ///
220    /// Used by the LeaseArbiter (planned 2026-05-10) to grab a helper
221    /// lease without contending with blocked variant tasks. Tokio's
222    /// Semaphore preserves FIFO ordering for queued waiters — a
223    /// permit released while a variant task is parked in
224    /// `acquire_owned().await` is reserved for that waiter and is NOT
225    /// visible to `try_acquire_owned()`, so this method cannot steal
226    /// a permit out from under a queued variant.
227    ///
228    /// Does NOT increment `pending_claimers`; helpers are not blocked
229    /// claimers in the spare-capacity sense.
230    pub fn try_claim(self: &Arc<Self>) -> Option<GpuLease> {
231        if self.gpu_indices.is_empty() {
232            return None;
233        }
234        let permit = Arc::clone(&self.permits).try_acquire_owned().ok()?;
235        Some(self.assign_free_slot(permit).expect(
236            "GpuPool: try_acquire_owned succeeded but no free slot found — \
237             this would mean semaphore and free-flag counts drifted apart",
238        ))
239    }
240
241    /// Permit → lease conversion shared by `claim()` and `try_claim()`.
242    /// The permit guarantees ≥1 free slot exists; the CAS loop finds
243    /// the first slot we win the race for. With N ≤ 16 GPUs in
244    /// realistic deployments the linear scan is faster than any
245    /// index-tracking scheme.
246    ///
247    /// Returns `Some(lease)` on the only correct path. Returns `None`
248    /// only if the semaphore and free-flag counts drifted apart,
249    /// which the pool's invariants forbid (`claim` panics via
250    /// `unreachable!` in that case; `try_claim` propagates as a
251    /// distinguishable "this would never happen" via `expect`).
252    fn assign_free_slot(&self, permit: OwnedSemaphorePermit) -> Option<GpuLease> {
253        for (slot_idx, slot) in self.free.iter().enumerate() {
254            if slot
255                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
256                .is_ok()
257            {
258                return Some(GpuLease {
259                    gpu_index: self.gpu_indices[slot_idx],
260                    vendor: self.gpu_vendors[slot_idx],
261                    slot_idx,
262                    free: Arc::clone(&self.free),
263                    _permit: permit,
264                });
265            }
266        }
267        None
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use codec::gpu::GpuVendor;
275
276    fn synth(index: u32) -> GpuDevice {
277        GpuDevice {
278            index,
279            vendor: GpuVendor::Nvidia,
280            name: format!("synth-{index}"),
281            generation: "Synth".into(),
282            pci_id: String::new(),
283            vram_mib: 0,
284            serial: None,
285            host_pci_address: String::new(),
286            vendor_id_hex: String::new(),
287        }
288    }
289
290    fn synth_intel(index: u32) -> GpuDevice {
291        GpuDevice {
292            index,
293            vendor: GpuVendor::Intel,
294            name: format!("intel-{index}"),
295            generation: "Synth".into(),
296            pci_id: String::new(),
297            vram_mib: 0,
298            serial: None,
299            host_pci_address: String::new(),
300            vendor_id_hex: String::new(),
301        }
302    }
303
304    #[tokio::test]
305    async fn empty_pool_returns_none() {
306        let pool = Arc::new(GpuPool::new(&[]));
307        assert!(pool.claim().await.is_none());
308        assert_eq!(pool.capacity(), 0);
309    }
310
311    #[tokio::test]
312    async fn single_gpu_serializes_claims() {
313        let pool = Arc::new(GpuPool::new(&[synth(0)]));
314        let lease1 = pool.claim().await.unwrap();
315        assert_eq!(lease1.gpu_index, 0);
316
317        // Second claim must wait — race it against a short timeout to
318        // assert it does NOT resolve while lease1 is held.
319        let pool_clone = Arc::clone(&pool);
320        let claim2 = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
321
322        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
323        assert!(
324            !claim2.is_finished(),
325            "second claim resolved while lease held"
326        );
327
328        drop(lease1);
329        let lease2 = claim2.await.unwrap();
330        assert_eq!(lease2.gpu_index, 0);
331    }
332
333    #[tokio::test]
334    async fn two_gpus_concurrent_leases_distinct_indices() {
335        let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
336        let lease_a = pool.claim().await.unwrap();
337        let lease_b = pool.claim().await.unwrap();
338        assert_ne!(lease_a.gpu_index, lease_b.gpu_index);
339    }
340
341    #[tokio::test]
342    async fn third_claim_waits_until_one_drops() {
343        let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
344        let lease_a = pool.claim().await.unwrap();
345        let _lease_b = pool.claim().await.unwrap();
346
347        let pool_clone = Arc::clone(&pool);
348        let claim_c = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
349
350        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
351        assert!(!claim_c.is_finished());
352
353        let dropped_idx = lease_a.gpu_index;
354        drop(lease_a);
355
356        let lease_c = claim_c.await.unwrap();
357        assert_eq!(lease_c.gpu_index, dropped_idx);
358    }
359
360    #[tokio::test]
361    async fn two_intel_arc_cards_both_get_intel_leases() {
362        // 2× Arc, 0× NVIDIA. Each card has its own per-vendor index.
363        // Both leases come back vendor=Intel and the indices are
364        // distinct so the encoder factory's pick_vendor_device(Intel,
365        // Some(0/1)) finds the right physical card per lease.
366        let pool = Arc::new(GpuPool::new(&[synth_intel(0), synth_intel(1)]));
367        let l1 = pool.claim().await.unwrap();
368        let l2 = pool.claim().await.unwrap();
369        assert_eq!(l1.vendor, GpuVendor::Intel);
370        assert_eq!(l2.vendor, GpuVendor::Intel);
371        let mut indices: Vec<u32> = vec![l1.gpu_index, l2.gpu_index];
372        indices.sort();
373        assert_eq!(indices, vec![0, 1]);
374    }
375
376    #[tokio::test]
377    async fn two_nvidia_cards_both_get_nvidia_leases() {
378        // 2× NVIDIA, 0× Arc. Same shape as the Intel-Intel case.
379        let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
380        let l1 = pool.claim().await.unwrap();
381        let l2 = pool.claim().await.unwrap();
382        assert_eq!(l1.vendor, GpuVendor::Nvidia);
383        assert_eq!(l2.vendor, GpuVendor::Nvidia);
384        let mut indices: Vec<u32> = vec![l1.gpu_index, l2.gpu_index];
385        indices.sort();
386        assert_eq!(indices, vec![0, 1]);
387    }
388
389    #[tokio::test]
390    async fn lease_carries_vendor_for_dispatch() {
391        // Multi-vendor host: NVIDIA at index 0 + Intel at index 0.
392        // Without vendor on the lease, the encoder factory's NVIDIA-
393        // first dispatch would have always picked NVENC. With vendor,
394        // each lease tells the factory which backend to use.
395        let pool = Arc::new(GpuPool::new(&[synth(0), synth_intel(0)]));
396        let l1 = pool.claim().await.unwrap();
397        let l2 = pool.claim().await.unwrap();
398        let mut vendors: Vec<GpuVendor> = vec![l1.vendor, l2.vendor];
399        // Order is non-deterministic between the two slots; both
400        // vendors must appear exactly once.
401        vendors.sort_by_key(|v| match v {
402            GpuVendor::Nvidia => 0,
403            GpuVendor::Amd => 1,
404            GpuVendor::Intel => 2,
405        });
406        assert_eq!(vendors, vec![GpuVendor::Nvidia, GpuVendor::Intel]);
407    }
408
409    #[tokio::test]
410    async fn snapshot_leases_reflects_current_state() {
411        // Phase 2 contract: snapshot returns one entry per slot in
412        // construction order; `leased` mirrors the live free-flag.
413        let pool = Arc::new(GpuPool::new(&[synth(0), synth_intel(1)]));
414
415        let snap0 = pool.snapshot_leases();
416        assert_eq!(snap0.len(), 2);
417        assert_eq!(snap0[0].index, 0);
418        assert_eq!(snap0[0].vendor, GpuVendor::Nvidia);
419        assert!(!snap0[0].leased);
420        assert_eq!(snap0[1].index, 1);
421        assert_eq!(snap0[1].vendor, GpuVendor::Intel);
422        assert!(!snap0[1].leased);
423
424        // Claim the NVIDIA slot → snapshot reflects it.
425        let lease = pool.claim().await.unwrap();
426        // Order in which slots get claimed isn't strictly tied to
427        // vec position, but with N=2 and one outstanding lease the
428        // snapshot must show exactly one `leased=true`.
429        let snap1 = pool.snapshot_leases();
430        let leased_count = snap1.iter().filter(|e| e.leased).count();
431        assert_eq!(leased_count, 1);
432
433        drop(lease);
434        let snap2 = pool.snapshot_leases();
435        assert!(snap2.iter().all(|e| !e.leased));
436    }
437
438    #[tokio::test]
439    async fn snapshot_leases_empty_for_cpu_host() {
440        let pool = Arc::new(GpuPool::new(&[]));
441        let snap = pool.snapshot_leases();
442        assert!(snap.is_empty());
443    }
444
445    #[tokio::test]
446    async fn snapshot_leases_carries_device_name() {
447        // The Phase 2 load-tick task reads .name straight off the
448        // snapshot to build the worker_load frame's gpu_pool entry,
449        // so the lookup must hit the real GpuDevice.name (not the
450        // stringified vendor).
451        let pool = Arc::new(GpuPool::new(&[synth(0)]));
452        let snap = pool.snapshot_leases();
453        assert_eq!(snap.len(), 1);
454        assert_eq!(snap[0].name, "synth-0");
455    }
456
457    // ---- pending_claimers + try_claim (2026-05-10) ----
458
459    #[tokio::test]
460    async fn pending_claimers_starts_at_zero() {
461        let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
462        assert_eq!(pool.pending_claimers(), 0);
463    }
464
465    #[tokio::test]
466    async fn pending_claimers_zero_after_unblocked_claim() {
467        // Single GPU, single immediate claim — never blocks, so the
468        // count should observe 0 both before AND after the claim.
469        let pool = Arc::new(GpuPool::new(&[synth(0)]));
470        assert_eq!(pool.pending_claimers(), 0);
471        let _lease = pool.claim().await.unwrap();
472        assert_eq!(pool.pending_claimers(), 0);
473    }
474
475    #[tokio::test]
476    async fn pending_claimers_increments_during_blocked_claim() {
477        // 1 GPU, take it; spawn a second claim → that task parks in
478        // `acquire_owned().await`; pending_claimers should observe 1.
479        let pool = Arc::new(GpuPool::new(&[synth(0)]));
480        let lease1 = pool.claim().await.unwrap();
481        assert_eq!(pool.pending_claimers(), 0);
482
483        let pool_clone = Arc::clone(&pool);
484        let claim2 = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
485
486        // Give the spawned task a moment to enter the await.
487        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
488        assert_eq!(
489            pool.pending_claimers(),
490            1,
491            "blocked claimer should be counted",
492        );
493
494        // Release the lease → blocked claimer resumes.
495        drop(lease1);
496        let _lease2 = claim2.await.unwrap();
497        assert_eq!(
498            pool.pending_claimers(),
499            0,
500            "after resume, blocked count returns to 0",
501        );
502    }
503
504    #[tokio::test]
505    async fn pending_claimers_increments_for_multiple_blockers() {
506        // 1 GPU, 3 concurrent claimers (1 immediate, 2 blocked) →
507        // pending observes 2 while both are parked.
508        let pool = Arc::new(GpuPool::new(&[synth(0)]));
509        let lease1 = pool.claim().await.unwrap();
510
511        let pool_a = Arc::clone(&pool);
512        let _a = tokio::spawn(async move { pool_a.claim().await.unwrap() });
513        let pool_b = Arc::clone(&pool);
514        let _b = tokio::spawn(async move { pool_b.claim().await.unwrap() });
515
516        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
517        assert_eq!(pool.pending_claimers(), 2);
518
519        drop(lease1);
520        // First waiter resumes; second still parked → count goes 2→1.
521        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
522        assert_eq!(pool.pending_claimers(), 1);
523    }
524
525    #[tokio::test]
526    async fn pending_claimers_decrements_under_cancellation() {
527        // Park a claim, then abort the task before the await
528        // resolves. The PendingClaimGuard's Drop must still run and
529        // bring the count back to 0.
530        let pool = Arc::new(GpuPool::new(&[synth(0)]));
531        let _lease1 = pool.claim().await.unwrap();
532
533        let pool_clone = Arc::clone(&pool);
534        let task = tokio::spawn(async move { pool_clone.claim().await });
535        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
536        assert_eq!(pool.pending_claimers(), 1);
537
538        task.abort();
539        // Abort drops the future, which drops the PendingClaimGuard
540        // inside the await scope. Allow a scheduler tick to observe.
541        let _ = task.await; // resolves with JoinError(Cancelled)
542        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
543        assert_eq!(
544            pool.pending_claimers(),
545            0,
546            "cancelled claim must still decrement pending_claimers",
547        );
548    }
549
550    #[tokio::test]
551    async fn try_claim_returns_none_when_pool_full() {
552        // All permits taken → try_claim is None.
553        let pool = Arc::new(GpuPool::new(&[synth(0)]));
554        let _lease = pool.claim().await.unwrap();
555        assert!(pool.try_claim().is_none());
556    }
557
558    #[tokio::test]
559    async fn try_claim_returns_lease_when_capacity_available() {
560        let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
561        let lease1 = pool.try_claim().unwrap();
562        let lease2 = pool.try_claim().unwrap();
563        assert_ne!(lease1.gpu_index, lease2.gpu_index);
564        assert!(
565            pool.try_claim().is_none(),
566            "after both GPUs leased, third try_claim must be None",
567        );
568    }
569
570    #[tokio::test]
571    async fn try_claim_returns_none_on_cpu_only_host() {
572        let pool = Arc::new(GpuPool::new(&[]));
573        assert!(pool.try_claim().is_none());
574    }
575
576    #[tokio::test]
577    async fn try_claim_does_not_steal_from_blocked_claimer() {
578        // The contract the LeaseArbiter relies on: when a variant
579        // task is parked in `claim()`'s `acquire_owned().await` and a
580        // permit becomes available, that permit goes to the parked
581        // variant FIRST. A racing `try_claim()` must return None.
582        //
583        // Tokio's Semaphore is documented as FIFO for `acquire_owned`;
584        // released permits are reserved for queued waiters and are
585        // NOT visible to `try_acquire_owned()`. This test guards
586        // against an accidental regression (e.g. someone swapping in
587        // a non-fair semaphore) by verifying the behaviour
588        // empirically.
589        let pool = Arc::new(GpuPool::new(&[synth(0)]));
590        let lease1 = pool.claim().await.unwrap();
591
592        // Park a blocked claimer.
593        let pool_clone = Arc::clone(&pool);
594        let blocked = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
595        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
596        assert_eq!(pool.pending_claimers(), 1);
597
598        // Release the lease. The released permit is now reserved for
599        // the parked claimer per Tokio's FIFO contract.
600        drop(lease1);
601
602        // Try to steal it from the blocked claimer — must fail.
603        assert!(
604            pool.try_claim().is_none(),
605            "try_claim must not steal a permit reserved for a queued claimer",
606        );
607
608        // The blocked claimer should still resolve.
609        let _lease2 = blocked.await.unwrap();
610    }
611
612    #[tokio::test]
613    async fn try_claim_lease_drop_releases_permit() {
614        // try_claim leases use the same RAII Drop path; verify the
615        // permit returns to the pool when the lease drops.
616        let pool = Arc::new(GpuPool::new(&[synth(0)]));
617        let lease = pool.try_claim().unwrap();
618        assert!(pool.try_claim().is_none());
619        drop(lease);
620        assert!(pool.try_claim().is_some(), "permit returned to pool after lease drop");
621    }
622
623    #[tokio::test]
624    async fn try_claim_does_not_affect_pending_claimers() {
625        // try_claim must not touch pending_claimers — helpers are
626        // opportunistic, not blocked claimers.
627        let pool = Arc::new(GpuPool::new(&[synth(0)]));
628        let _l1 = pool.try_claim().unwrap();
629        assert_eq!(pool.pending_claimers(), 0);
630        assert!(pool.try_claim().is_none());
631        assert_eq!(pool.pending_claimers(), 0);
632    }
633
634    #[tokio::test]
635    async fn sparse_indices_preserved() {
636        // CUDA_VISIBLE_DEVICES could expose only [0, 2, 5].
637        let pool = Arc::new(GpuPool::new(&[synth(0), synth(2), synth(5)]));
638        let l0 = pool.claim().await.unwrap();
639        let l1 = pool.claim().await.unwrap();
640        let l2 = pool.claim().await.unwrap();
641        let mut got: Vec<u32> = vec![l0.gpu_index, l1.gpu_index, l2.gpu_index];
642        got.sort();
643        assert_eq!(got, vec![0, 2, 5]);
644    }
645}