rivet/gpu_pool.rs
1//! Process-wide GPU reservation pool.
2//!
3//! Each detected GPU is a slot. Callers `claim()` an available slot
4//! and hold the returned `GpuLease` for the duration of their work;
5//! `Drop` releases the slot back to the pool. The lease's
6//! `gpu_index` field is the device index the work should run on.
7//!
8//! Concurrency model: one variant per GPU at any time. With N GPUs
9//! and M waiters, the first N waiters get leases immediately and the
10//! remaining M−N park on the semaphore until a lease drops. This is
11//! the deliberate design decision from 2026-05-02 — concurrent
12//! NVENC sessions on the same CUDA context deadlocked at session
13//! ~5/5 init, GPU went idle, no frames encoded. One-encoder-per-GPU
14//! is the load-bearing invariant; the pool's role is to enforce it
15//! while still letting variants run in parallel ACROSS GPUs.
16//!
17//! CPU-only hosts (no GPUs detected): `claim()` returns `None`
18//! immediately — callers fall back to CPU encode without queuing.
19
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22
23use codec::gpu::{GpuDevice, GpuVendor};
24use tokio::sync::{OwnedSemaphorePermit, Semaphore};
25
26pub struct GpuPool {
27 /// Per-slot GPU device index (`GpuDevice.index`, not vec position
28 /// — accommodates sparse `CUDA_VISIBLE_DEVICES` setups).
29 gpu_indices: Vec<u32>,
30 /// Per-slot vendor — load-bearing for the encoder factory's
31 /// vendor-aware dispatch. Without it, multi-vendor hosts (NVIDIA
32 /// + Intel Arc) ALWAYS picked NVENC because the factory tries
33 /// NVIDIA first and both vendors expose index 0; the Arc sat
34 /// idle even when the NVIDIA card was busy.
35 gpu_vendors: Vec<GpuVendor>,
36 /// Per-slot human-readable device name. Used by `snapshot_leases`
37 /// (Phase 2 worker_load reporting) so the backend's admin view
38 /// can label each GPU lease badge with the same string the hello
39 /// frame already advertised. Stays in lockstep with the hello
40 /// frame's `WsGpuInfo.name`.
41 gpu_names: Vec<String>,
42 /// Per-slot free flag. `true` = available; `false` = leased.
43 /// Atomic so the CAS-find-free-slot path under `claim()` is
44 /// lock-free; correctness is enforced by the semaphore counting
45 /// (see `claim`).
46 free: Arc<Vec<AtomicBool>>,
47 /// Semaphore with N permits (= number of GPUs). Acquiring a
48 /// permit guarantees at least one `free` slot exists, so the
49 /// CAS loop in `claim()` always succeeds without retry.
50 permits: Arc<Semaphore>,
51 /// Count of variant tasks currently blocked inside `claim()`'s
52 /// `acquire_owned().await`. Used by the LeaseArbiter (planned
53 /// 2026-05-10) to decide whether to dispatch a helper task: if
54 /// any variant is already waiting for a permit, that variant
55 /// must claim before the arbiter steals a permit for a helper.
56 /// Incremented immediately before `acquire_owned().await`;
57 /// decremented as soon as the await returns (success or
58 /// cancellation) via the `PendingClaimGuard` RAII helper.
59 ///
60 /// `try_claim()` does NOT touch this — helpers are not blocked
61 /// claimers in the spare-capacity sense.
62 pending_claimers: Arc<AtomicUsize>,
63}
64
65/// RAII guard that increments `pending_claimers` on construction
66/// and decrements on drop. Used inside `claim()` to bracket the
67/// `acquire_owned().await` so the counter stays accurate even when
68/// the awaiting task is cancelled mid-await (the future is dropped,
69/// guard drops, counter decrements).
70struct PendingClaimGuard {
71 counter: Arc<AtomicUsize>,
72}
73
74impl PendingClaimGuard {
75 fn new(counter: Arc<AtomicUsize>) -> Self {
76 counter.fetch_add(1, Ordering::AcqRel);
77 Self { counter }
78 }
79}
80
81impl Drop for PendingClaimGuard {
82 fn drop(&mut self) {
83 self.counter.fetch_sub(1, Ordering::AcqRel);
84 }
85}
86
87/// Snapshot of one GPU slot's lease state at a moment in time.
88/// Returned by [`GpuPool::snapshot_leases`] for Phase 2 worker_load
89/// reporting. Field shape matches `queue::WsGpuLeaseEntry` so the
90/// caller can map across without a wire-format-aware translation.
91#[derive(Debug, Clone)]
92pub struct GpuLeaseEntry {
93 pub vendor: GpuVendor,
94 pub name: String,
95 pub index: u32,
96 pub leased: bool,
97}
98
99/// RAII guard returned by `GpuPool::claim`. The slot is released
100/// (and the underlying semaphore permit dropped) when this value
101/// is dropped — typically at the end of the variant's encode task.
102pub struct GpuLease {
103 pub gpu_index: u32,
104 pub vendor: GpuVendor,
105 slot_idx: usize,
106 free: Arc<Vec<AtomicBool>>,
107 _permit: OwnedSemaphorePermit,
108}
109
110impl Drop for GpuLease {
111 fn drop(&mut self) {
112 self.free[self.slot_idx].store(true, Ordering::Release);
113 }
114}
115
116impl GpuPool {
117 /// Build a pool from the host's detected GPU inventory. An empty
118 /// inventory is permitted — the resulting pool always returns
119 /// `None` from `claim()` so CPU-only hosts work without
120 /// special-casing at the call site.
121 pub fn new(devices: &[GpuDevice]) -> Self {
122 let n = devices.len();
123 Self {
124 gpu_indices: devices.iter().map(|d| d.index).collect(),
125 gpu_vendors: devices.iter().map(|d| d.vendor).collect(),
126 gpu_names: devices.iter().map(|d| d.name.clone()).collect(),
127 free: Arc::new((0..n).map(|_| AtomicBool::new(true)).collect()),
128 // Semaphore::new(0) is valid but `acquire` would deadlock.
129 // We never acquire on the empty path because `claim()`
130 // returns `None` early on CPU-only hosts.
131 permits: Arc::new(Semaphore::new(n)),
132 pending_claimers: Arc::new(AtomicUsize::new(0)),
133 }
134 }
135
136 /// How many variant tasks are currently parked inside `claim()`
137 /// waiting for a permit. The LeaseArbiter consults this to decide
138 /// whether to dispatch a helper: when `pending_claimers() > 0`,
139 /// at least one variant task wants a GPU and the arbiter must
140 /// step back so the variant claims first (FIFO fairness).
141 ///
142 /// Reads with `Ordering::Acquire`. The result is momentary — by
143 /// the time the caller observes it, a claim may have resolved or
144 /// a new one parked. That's expected; the arbiter re-checks
145 /// before each dispatch decision.
146 pub fn pending_claimers(&self) -> usize {
147 self.pending_claimers.load(Ordering::Acquire)
148 }
149
150 /// How many GPUs this pool manages. Useful for pre-spawning
151 /// variants when fewer variants exist than GPUs (no point
152 /// over-claiming).
153 pub fn capacity(&self) -> usize {
154 self.gpu_indices.len()
155 }
156
157 /// Snapshot per-GPU lease state. Result preserves slot order
158 /// (matches the order [`GpuPool::new`] saw devices), so callers
159 /// stitching the result against the hello frame's `gpu_pool`
160 /// see consistent indices across both reports.
161 ///
162 /// Reads `free` slots with `Ordering::Acquire`. The result is a
163 /// momentary snapshot — by the time the caller observes it, a
164 /// claim or drop may have flipped any slot. That's expected;
165 /// load reporting is best-effort observability, not a
166 /// transactional view.
167 ///
168 /// Used by the worker's Phase 2 (2026-05-07) load-tick task to
169 /// build the `worker_load` frame's `gpu_pool` field.
170 pub fn snapshot_leases(&self) -> Vec<GpuLeaseEntry> {
171 self.gpu_indices
172 .iter()
173 .zip(self.gpu_vendors.iter())
174 .zip(self.gpu_names.iter())
175 .enumerate()
176 .map(|(slot_idx, ((index, vendor), name))| GpuLeaseEntry {
177 vendor: *vendor,
178 name: name.clone(),
179 index: *index,
180 leased: !self.free[slot_idx].load(Ordering::Acquire),
181 })
182 .collect()
183 }
184
185 /// Claim an available GPU. Awaits if every GPU is currently
186 /// leased. Returns `None` immediately on CPU-only hosts — the
187 /// caller should fall back to CPU encode.
188 pub async fn claim(self: &Arc<Self>) -> Option<GpuLease> {
189 if self.gpu_indices.is_empty() {
190 return None;
191 }
192 // Track "blocked waiting for a permit" for the LeaseArbiter's
193 // fairness check. Guard is scoped to the await: on success the
194 // guard drops at end-of-block (decrement); on cancellation the
195 // future is dropped mid-await, the guard drops, and the
196 // counter still decrements. Either way the count stays
197 // accurate.
198 let permit = {
199 let _pending = PendingClaimGuard::new(Arc::clone(&self.pending_claimers));
200 Arc::clone(&self.permits)
201 .acquire_owned()
202 .await
203 .expect("GpuPool semaphore should never be closed")
204 };
205 // The permit guarantees ≥1 free slot. None here means the
206 // semaphore count and free-flag count drifted apart — a bug
207 // (RAII Drop bypassed, wrong atomic ordering, etc.).
208 match self.assign_free_slot(permit) {
209 Some(lease) => Some(lease),
210 None => unreachable!(
211 "GpuPool: permit acquired but no free slot found — \
212 semaphore count and free-flag count drifted apart"
213 ),
214 }
215 }
216
217 /// Try to claim a GPU without blocking. Returns `None` if every
218 /// GPU is currently leased OR if the host has no GPUs.
219 ///
220 /// Used by the LeaseArbiter (planned 2026-05-10) to grab a helper
221 /// lease without contending with blocked variant tasks. Tokio's
222 /// Semaphore preserves FIFO ordering for queued waiters — a
223 /// permit released while a variant task is parked in
224 /// `acquire_owned().await` is reserved for that waiter and is NOT
225 /// visible to `try_acquire_owned()`, so this method cannot steal
226 /// a permit out from under a queued variant.
227 ///
228 /// Does NOT increment `pending_claimers`; helpers are not blocked
229 /// claimers in the spare-capacity sense.
230 pub fn try_claim(self: &Arc<Self>) -> Option<GpuLease> {
231 if self.gpu_indices.is_empty() {
232 return None;
233 }
234 let permit = Arc::clone(&self.permits).try_acquire_owned().ok()?;
235 Some(self.assign_free_slot(permit).expect(
236 "GpuPool: try_acquire_owned succeeded but no free slot found — \
237 this would mean semaphore and free-flag counts drifted apart",
238 ))
239 }
240
241 /// Permit → lease conversion shared by `claim()` and `try_claim()`.
242 /// The permit guarantees ≥1 free slot exists; the CAS loop finds
243 /// the first slot we win the race for. With N ≤ 16 GPUs in
244 /// realistic deployments the linear scan is faster than any
245 /// index-tracking scheme.
246 ///
247 /// Returns `Some(lease)` on the only correct path. Returns `None`
248 /// only if the semaphore and free-flag counts drifted apart,
249 /// which the pool's invariants forbid (`claim` panics via
250 /// `unreachable!` in that case; `try_claim` propagates as a
251 /// distinguishable "this would never happen" via `expect`).
252 fn assign_free_slot(&self, permit: OwnedSemaphorePermit) -> Option<GpuLease> {
253 for (slot_idx, slot) in self.free.iter().enumerate() {
254 if slot
255 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
256 .is_ok()
257 {
258 return Some(GpuLease {
259 gpu_index: self.gpu_indices[slot_idx],
260 vendor: self.gpu_vendors[slot_idx],
261 slot_idx,
262 free: Arc::clone(&self.free),
263 _permit: permit,
264 });
265 }
266 }
267 None
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use codec::gpu::GpuVendor;
275
276 fn synth(index: u32) -> GpuDevice {
277 GpuDevice {
278 index,
279 vendor: GpuVendor::Nvidia,
280 name: format!("synth-{index}"),
281 generation: "Synth".into(),
282 pci_id: String::new(),
283 vram_mib: 0,
284 serial: None,
285 host_pci_address: String::new(),
286 vendor_id_hex: String::new(),
287 }
288 }
289
290 fn synth_intel(index: u32) -> GpuDevice {
291 GpuDevice {
292 index,
293 vendor: GpuVendor::Intel,
294 name: format!("intel-{index}"),
295 generation: "Synth".into(),
296 pci_id: String::new(),
297 vram_mib: 0,
298 serial: None,
299 host_pci_address: String::new(),
300 vendor_id_hex: String::new(),
301 }
302 }
303
304 #[tokio::test]
305 async fn empty_pool_returns_none() {
306 let pool = Arc::new(GpuPool::new(&[]));
307 assert!(pool.claim().await.is_none());
308 assert_eq!(pool.capacity(), 0);
309 }
310
311 #[tokio::test]
312 async fn single_gpu_serializes_claims() {
313 let pool = Arc::new(GpuPool::new(&[synth(0)]));
314 let lease1 = pool.claim().await.unwrap();
315 assert_eq!(lease1.gpu_index, 0);
316
317 // Second claim must wait — race it against a short timeout to
318 // assert it does NOT resolve while lease1 is held.
319 let pool_clone = Arc::clone(&pool);
320 let claim2 = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
321
322 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
323 assert!(
324 !claim2.is_finished(),
325 "second claim resolved while lease held"
326 );
327
328 drop(lease1);
329 let lease2 = claim2.await.unwrap();
330 assert_eq!(lease2.gpu_index, 0);
331 }
332
333 #[tokio::test]
334 async fn two_gpus_concurrent_leases_distinct_indices() {
335 let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
336 let lease_a = pool.claim().await.unwrap();
337 let lease_b = pool.claim().await.unwrap();
338 assert_ne!(lease_a.gpu_index, lease_b.gpu_index);
339 }
340
341 #[tokio::test]
342 async fn third_claim_waits_until_one_drops() {
343 let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
344 let lease_a = pool.claim().await.unwrap();
345 let _lease_b = pool.claim().await.unwrap();
346
347 let pool_clone = Arc::clone(&pool);
348 let claim_c = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
349
350 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
351 assert!(!claim_c.is_finished());
352
353 let dropped_idx = lease_a.gpu_index;
354 drop(lease_a);
355
356 let lease_c = claim_c.await.unwrap();
357 assert_eq!(lease_c.gpu_index, dropped_idx);
358 }
359
360 #[tokio::test]
361 async fn two_intel_arc_cards_both_get_intel_leases() {
362 // 2× Arc, 0× NVIDIA. Each card has its own per-vendor index.
363 // Both leases come back vendor=Intel and the indices are
364 // distinct so the encoder factory's pick_vendor_device(Intel,
365 // Some(0/1)) finds the right physical card per lease.
366 let pool = Arc::new(GpuPool::new(&[synth_intel(0), synth_intel(1)]));
367 let l1 = pool.claim().await.unwrap();
368 let l2 = pool.claim().await.unwrap();
369 assert_eq!(l1.vendor, GpuVendor::Intel);
370 assert_eq!(l2.vendor, GpuVendor::Intel);
371 let mut indices: Vec<u32> = vec![l1.gpu_index, l2.gpu_index];
372 indices.sort();
373 assert_eq!(indices, vec![0, 1]);
374 }
375
376 #[tokio::test]
377 async fn two_nvidia_cards_both_get_nvidia_leases() {
378 // 2× NVIDIA, 0× Arc. Same shape as the Intel-Intel case.
379 let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
380 let l1 = pool.claim().await.unwrap();
381 let l2 = pool.claim().await.unwrap();
382 assert_eq!(l1.vendor, GpuVendor::Nvidia);
383 assert_eq!(l2.vendor, GpuVendor::Nvidia);
384 let mut indices: Vec<u32> = vec![l1.gpu_index, l2.gpu_index];
385 indices.sort();
386 assert_eq!(indices, vec![0, 1]);
387 }
388
389 #[tokio::test]
390 async fn lease_carries_vendor_for_dispatch() {
391 // Multi-vendor host: NVIDIA at index 0 + Intel at index 0.
392 // Without vendor on the lease, the encoder factory's NVIDIA-
393 // first dispatch would have always picked NVENC. With vendor,
394 // each lease tells the factory which backend to use.
395 let pool = Arc::new(GpuPool::new(&[synth(0), synth_intel(0)]));
396 let l1 = pool.claim().await.unwrap();
397 let l2 = pool.claim().await.unwrap();
398 let mut vendors: Vec<GpuVendor> = vec![l1.vendor, l2.vendor];
399 // Order is non-deterministic between the two slots; both
400 // vendors must appear exactly once.
401 vendors.sort_by_key(|v| match v {
402 GpuVendor::Nvidia => 0,
403 GpuVendor::Amd => 1,
404 GpuVendor::Intel => 2,
405 });
406 assert_eq!(vendors, vec![GpuVendor::Nvidia, GpuVendor::Intel]);
407 }
408
409 #[tokio::test]
410 async fn snapshot_leases_reflects_current_state() {
411 // Phase 2 contract: snapshot returns one entry per slot in
412 // construction order; `leased` mirrors the live free-flag.
413 let pool = Arc::new(GpuPool::new(&[synth(0), synth_intel(1)]));
414
415 let snap0 = pool.snapshot_leases();
416 assert_eq!(snap0.len(), 2);
417 assert_eq!(snap0[0].index, 0);
418 assert_eq!(snap0[0].vendor, GpuVendor::Nvidia);
419 assert!(!snap0[0].leased);
420 assert_eq!(snap0[1].index, 1);
421 assert_eq!(snap0[1].vendor, GpuVendor::Intel);
422 assert!(!snap0[1].leased);
423
424 // Claim the NVIDIA slot → snapshot reflects it.
425 let lease = pool.claim().await.unwrap();
426 // Order in which slots get claimed isn't strictly tied to
427 // vec position, but with N=2 and one outstanding lease the
428 // snapshot must show exactly one `leased=true`.
429 let snap1 = pool.snapshot_leases();
430 let leased_count = snap1.iter().filter(|e| e.leased).count();
431 assert_eq!(leased_count, 1);
432
433 drop(lease);
434 let snap2 = pool.snapshot_leases();
435 assert!(snap2.iter().all(|e| !e.leased));
436 }
437
438 #[tokio::test]
439 async fn snapshot_leases_empty_for_cpu_host() {
440 let pool = Arc::new(GpuPool::new(&[]));
441 let snap = pool.snapshot_leases();
442 assert!(snap.is_empty());
443 }
444
445 #[tokio::test]
446 async fn snapshot_leases_carries_device_name() {
447 // The Phase 2 load-tick task reads .name straight off the
448 // snapshot to build the worker_load frame's gpu_pool entry,
449 // so the lookup must hit the real GpuDevice.name (not the
450 // stringified vendor).
451 let pool = Arc::new(GpuPool::new(&[synth(0)]));
452 let snap = pool.snapshot_leases();
453 assert_eq!(snap.len(), 1);
454 assert_eq!(snap[0].name, "synth-0");
455 }
456
457 // ---- pending_claimers + try_claim (2026-05-10) ----
458
459 #[tokio::test]
460 async fn pending_claimers_starts_at_zero() {
461 let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
462 assert_eq!(pool.pending_claimers(), 0);
463 }
464
465 #[tokio::test]
466 async fn pending_claimers_zero_after_unblocked_claim() {
467 // Single GPU, single immediate claim — never blocks, so the
468 // count should observe 0 both before AND after the claim.
469 let pool = Arc::new(GpuPool::new(&[synth(0)]));
470 assert_eq!(pool.pending_claimers(), 0);
471 let _lease = pool.claim().await.unwrap();
472 assert_eq!(pool.pending_claimers(), 0);
473 }
474
475 #[tokio::test]
476 async fn pending_claimers_increments_during_blocked_claim() {
477 // 1 GPU, take it; spawn a second claim → that task parks in
478 // `acquire_owned().await`; pending_claimers should observe 1.
479 let pool = Arc::new(GpuPool::new(&[synth(0)]));
480 let lease1 = pool.claim().await.unwrap();
481 assert_eq!(pool.pending_claimers(), 0);
482
483 let pool_clone = Arc::clone(&pool);
484 let claim2 = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
485
486 // Give the spawned task a moment to enter the await.
487 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
488 assert_eq!(
489 pool.pending_claimers(),
490 1,
491 "blocked claimer should be counted",
492 );
493
494 // Release the lease → blocked claimer resumes.
495 drop(lease1);
496 let _lease2 = claim2.await.unwrap();
497 assert_eq!(
498 pool.pending_claimers(),
499 0,
500 "after resume, blocked count returns to 0",
501 );
502 }
503
504 #[tokio::test]
505 async fn pending_claimers_increments_for_multiple_blockers() {
506 // 1 GPU, 3 concurrent claimers (1 immediate, 2 blocked) →
507 // pending observes 2 while both are parked.
508 let pool = Arc::new(GpuPool::new(&[synth(0)]));
509 let lease1 = pool.claim().await.unwrap();
510
511 let pool_a = Arc::clone(&pool);
512 let _a = tokio::spawn(async move { pool_a.claim().await.unwrap() });
513 let pool_b = Arc::clone(&pool);
514 let _b = tokio::spawn(async move { pool_b.claim().await.unwrap() });
515
516 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
517 assert_eq!(pool.pending_claimers(), 2);
518
519 drop(lease1);
520 // First waiter resumes; second still parked → count goes 2→1.
521 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
522 assert_eq!(pool.pending_claimers(), 1);
523 }
524
525 #[tokio::test]
526 async fn pending_claimers_decrements_under_cancellation() {
527 // Park a claim, then abort the task before the await
528 // resolves. The PendingClaimGuard's Drop must still run and
529 // bring the count back to 0.
530 let pool = Arc::new(GpuPool::new(&[synth(0)]));
531 let _lease1 = pool.claim().await.unwrap();
532
533 let pool_clone = Arc::clone(&pool);
534 let task = tokio::spawn(async move { pool_clone.claim().await });
535 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
536 assert_eq!(pool.pending_claimers(), 1);
537
538 task.abort();
539 // Abort drops the future, which drops the PendingClaimGuard
540 // inside the await scope. Allow a scheduler tick to observe.
541 let _ = task.await; // resolves with JoinError(Cancelled)
542 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
543 assert_eq!(
544 pool.pending_claimers(),
545 0,
546 "cancelled claim must still decrement pending_claimers",
547 );
548 }
549
550 #[tokio::test]
551 async fn try_claim_returns_none_when_pool_full() {
552 // All permits taken → try_claim is None.
553 let pool = Arc::new(GpuPool::new(&[synth(0)]));
554 let _lease = pool.claim().await.unwrap();
555 assert!(pool.try_claim().is_none());
556 }
557
558 #[tokio::test]
559 async fn try_claim_returns_lease_when_capacity_available() {
560 let pool = Arc::new(GpuPool::new(&[synth(0), synth(1)]));
561 let lease1 = pool.try_claim().unwrap();
562 let lease2 = pool.try_claim().unwrap();
563 assert_ne!(lease1.gpu_index, lease2.gpu_index);
564 assert!(
565 pool.try_claim().is_none(),
566 "after both GPUs leased, third try_claim must be None",
567 );
568 }
569
570 #[tokio::test]
571 async fn try_claim_returns_none_on_cpu_only_host() {
572 let pool = Arc::new(GpuPool::new(&[]));
573 assert!(pool.try_claim().is_none());
574 }
575
576 #[tokio::test]
577 async fn try_claim_does_not_steal_from_blocked_claimer() {
578 // The contract the LeaseArbiter relies on: when a variant
579 // task is parked in `claim()`'s `acquire_owned().await` and a
580 // permit becomes available, that permit goes to the parked
581 // variant FIRST. A racing `try_claim()` must return None.
582 //
583 // Tokio's Semaphore is documented as FIFO for `acquire_owned`;
584 // released permits are reserved for queued waiters and are
585 // NOT visible to `try_acquire_owned()`. This test guards
586 // against an accidental regression (e.g. someone swapping in
587 // a non-fair semaphore) by verifying the behaviour
588 // empirically.
589 let pool = Arc::new(GpuPool::new(&[synth(0)]));
590 let lease1 = pool.claim().await.unwrap();
591
592 // Park a blocked claimer.
593 let pool_clone = Arc::clone(&pool);
594 let blocked = tokio::spawn(async move { pool_clone.claim().await.unwrap() });
595 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
596 assert_eq!(pool.pending_claimers(), 1);
597
598 // Release the lease. The released permit is now reserved for
599 // the parked claimer per Tokio's FIFO contract.
600 drop(lease1);
601
602 // Try to steal it from the blocked claimer — must fail.
603 assert!(
604 pool.try_claim().is_none(),
605 "try_claim must not steal a permit reserved for a queued claimer",
606 );
607
608 // The blocked claimer should still resolve.
609 let _lease2 = blocked.await.unwrap();
610 }
611
612 #[tokio::test]
613 async fn try_claim_lease_drop_releases_permit() {
614 // try_claim leases use the same RAII Drop path; verify the
615 // permit returns to the pool when the lease drops.
616 let pool = Arc::new(GpuPool::new(&[synth(0)]));
617 let lease = pool.try_claim().unwrap();
618 assert!(pool.try_claim().is_none());
619 drop(lease);
620 assert!(pool.try_claim().is_some(), "permit returned to pool after lease drop");
621 }
622
623 #[tokio::test]
624 async fn try_claim_does_not_affect_pending_claimers() {
625 // try_claim must not touch pending_claimers — helpers are
626 // opportunistic, not blocked claimers.
627 let pool = Arc::new(GpuPool::new(&[synth(0)]));
628 let _l1 = pool.try_claim().unwrap();
629 assert_eq!(pool.pending_claimers(), 0);
630 assert!(pool.try_claim().is_none());
631 assert_eq!(pool.pending_claimers(), 0);
632 }
633
634 #[tokio::test]
635 async fn sparse_indices_preserved() {
636 // CUDA_VISIBLE_DEVICES could expose only [0, 2, 5].
637 let pool = Arc::new(GpuPool::new(&[synth(0), synth(2), synth(5)]));
638 let l0 = pool.claim().await.unwrap();
639 let l1 = pool.claim().await.unwrap();
640 let l2 = pool.claim().await.unwrap();
641 let mut got: Vec<u32> = vec![l0.gpu_index, l1.gpu_index, l2.gpu_index];
642 got.sort();
643 assert_eq!(got, vec![0, 2, 5]);
644 }
645}