Skip to main content

go_lib/
select.rs

1// SPDX-License-Identifier: Apache-2.0
2//! `selectgo` — the runtime heart of `select { }`.
3//!
4//! Ported from `runtime/select.go`.
5//!
6//! ## How it works
7//!
8//! `selectgo` receives a slice of [`SCase`]s and an optional `has_default`
9//! flag and picks the first case that can proceed without blocking.
10//!
11//! ```text
12//! 1. Build pollorder  — a random permutation of case indices (fairness).
13//! 2. Build lockorder  — case indices sorted by channel address (deadlock prevention).
14//! 3. Acquire all channel locks in lockorder.
15//! 4. First pass (pollorder): check each case for immediate readiness.
16//!    – buffer op:    perform it, release all locks, return winner.
17//!    – direct handoff (partner waiting): dequeue partner's sudog, perform op,
18//!      release all locks, call goready(partner), return winner.
19//!    – send on closed: release all locks, panic.
20//! 5. If has_default: release all locks, return (CASE_DEFAULT, false).
21//! 6. Blocking path:
22//!    a. For every case, allocate a sudog (is_select=true) and enqueue it.
23//!    b. Reset G.selectdone to 0 and G.param to null.
24//!    c. Release all locks.
25//!    d. gopark(Select).
26//! 7. On wakeup (winner wrote G.param = winning sudog):
27//!    a. Acquire all locks in lockorder.
28//!    b. Dequeue all *losing* sudogs (dequeue_sudog is a no-op if a racing
29//!       channel op already removed them).
30//!    c. Release all locks.
31//!    d. Release all sudogs back to the free list.
32//!    e. Return (winner_index, ok).
33//! ```
34//!
35//! ## Type erasure
36//!
37//! Channels are generic (`Hchan<T>`) but `selectgo` must operate over a
38//! heterogeneous set of them.  Each [`SCase`] carries four function pointers
39//! that are monomorphised at the call site (by the `select!` macro):
40//!
41//! | pointer       | purpose                                        |
42//! |---------------|------------------------------------------------|
43//! | `lock_fn`     | acquire the channel's `RawMutex`               |
44//! | `unlock_fn`   | release the channel's `RawMutex`               |
45//! | `try_fn`      | attempt the channel op while all locks held    |
46//! | `enqueue_fn`  | enqueue a sudog on the channel's wait queue    |
47//! | `dequeue_fn`  | remove a specific sudog (O(1) cleanup)         |
48//!
49//! `chan_ptr` is the type-erased `*const Hchan<T>` used as the channel
50//! identity for deduplication and address-ordered locking.
51//!
52//! ## Sentinel index
53//!
54//! `selectgo` returns `CASE_DEFAULT` (`usize::MAX`) when the default case is
55//! taken.  Channel cases use their 0-based index within the slice.
56
57use std::mem::ManuallyDrop;
58use std::ptr;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61
62use crate::chan::{Hchan, Receiver, Sender};
63use crate::runtime::g::{current_g, G, WaitReason};
64use crate::runtime::park::{gopark, goready};
65use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog};
66
67// ---------------------------------------------------------------------------
68// Public constants
69// ---------------------------------------------------------------------------
70
71/// Return value from [`selectgo`] when the default case is taken.
72pub const CASE_DEFAULT: usize = usize::MAX;
73
74// ---------------------------------------------------------------------------
75// TryResult — outcome of a single case's fast-path attempt
76// ---------------------------------------------------------------------------
77
78/// The result of attempting a channel case while all locks are held.
79#[derive(Debug)]
80pub(crate) enum TryResult {
81    /// Case is not immediately satisfiable.
82    NotReady,
83
84    /// Case completed via a buffer read/write.
85    /// `ok`: true for a normal value, false for a closed-channel receive.
86    Done { ok: bool },
87
88    /// Case completed via a direct goroutine-to-goroutine handoff.
89    /// The partner goroutine has been set up (`param` set, `success` set) but
90    /// not yet made runnable.  Caller must call `goready(gp)` after releasing
91    /// all locks.
92    Handoff { gp: *mut G, ok: bool },
93
94    /// Send attempted on a closed channel.  Caller must release all locks and
95    /// then `panic!("send on closed channel")`.
96    ClosedSend,
97}
98
99// SAFETY: TryResult is only ever used in a single goroutine between lock
100// acquire and lock release; the raw *mut G is not shared across threads.
101unsafe impl Send for TryResult {}
102
103// ---------------------------------------------------------------------------
104// SCase — one arm of a select statement
105// ---------------------------------------------------------------------------
106
107/// One arm of a `select` statement (send, receive, or default).
108///
109/// Constructed by [`recv_case_of`] / [`send_case_of`]; do not build directly.
110#[doc(hidden)]
111pub struct SCase {
112    /// Type-erased `*const Hchan<T>`.  Used as the channel identity for
113    /// deduplication and address-ordered locking.  `null` for a default arm.
114    pub(crate) chan_ptr: *const (),
115
116    /// The sudog enrolled on this channel while the goroutine is parked.
117    /// Set by `selectgo` in the blocking path; `null` for default and
118    /// fast-path returns.
119    pub(crate) sg: *mut Sudog,
120
121    /// Type-erased value pointer.
122    ///
123    /// **Send**: `*mut ManuallyDrop<T>` — the value to send (read by the fn pointers).
124    /// **Recv**: `*mut Option<T>` — output slot; written as `Some(val)` on a
125    ///           successful receive, left as `None` when the channel is closed.
126    /// **Default**: `null`.
127    pub(crate) elem: *mut u8,
128
129    // ─── vtable — filled in by select! macro ──────────────────────────────
130
131    /// Acquire the channel's lock.
132    pub(crate) lock_fn: unsafe fn(*const ()),
133
134    /// Release the channel's lock.
135    pub(crate) unlock_fn: unsafe fn(*const ()),
136
137    /// Try the channel operation while all locks are held.
138    ///
139    /// Signature: `(chan_ptr, elem) -> TryResult`
140    ///
141    /// For a send case, `elem` is `*mut ManuallyDrop<T>` (the value to send).
142    /// For a recv case, `elem` is `*mut Option<T>` (the output slot).
143    pub(crate) try_fn: unsafe fn(*const (), *mut u8) -> TryResult,
144
145    /// Enqueue `sg` on the channel's sendq or recvq (under the lock).
146    pub(crate) enqueue_fn: unsafe fn(*const (), *mut Sudog),
147
148    /// Remove `sg` from the channel's sendq or recvq (under the lock).
149    /// No-op if `sg` was already removed by a racing channel operation.
150    pub(crate) dequeue_fn: unsafe fn(*const (), *mut Sudog),
151}
152
153// SAFETY: SCase is always used within a single goroutine context; the raw
154// pointers are only shared via the scheduler under goroutine-exclusion.
155unsafe impl Send for SCase {}
156
157// ---------------------------------------------------------------------------
158// Lehmer RNG — tiny PRNG for poll-order shuffling
159// ---------------------------------------------------------------------------
160
161/// A Lehmer (Park–Miller) multiplicative congruential PRNG.
162///
163/// Used only to produce the random poll order; cryptographic quality is
164/// not required.  Seeded from the current goroutine's `goid`.
165struct Lehmer(u64);
166
167impl Lehmer {
168    fn from_goid() -> Self {
169        let gp = current_g();
170        // SAFETY: gp is only dereferenced after the null check.
171        let goid = if gp.is_null() { 1 } else { (unsafe { (*gp).goid }) | 1 };
172        Lehmer(goid | 1) // must be odd and non-zero
173    }
174
175    /// Return a pseudo-random value in `[0, n)`.
176    fn next_usize(&mut self, n: usize) -> usize {
177        // 64-bit Lehmer with multiplier from Knuth TAOCP Vol 2 §3.3.4.
178        self.0 = self.0.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
179        ((self.0 >> 33) as usize) % n
180    }
181}
182
183// ---------------------------------------------------------------------------
184// selectgo
185// ---------------------------------------------------------------------------
186
187/// Run a `select` over the given cases.
188///
189/// `cases` must contain only **channel** cases (send or receive); pass
190/// `has_default = true` if the select has a `default` arm.
191///
192/// Returns `(chosen_index, received_ok)` where:
193/// - `chosen_index` is the 0-based index into `cases`, or [`CASE_DEFAULT`] if
194///   the default arm was taken.
195/// - `received_ok` is `true` for a normal channel recv, `false` if the
196///   channel was closed (and the receive wrote `None` into the slot).  Always
197///   `false` for send/default arms.
198///
199/// # Preconditions
200///
201/// - All `SCase` values must be created by [`recv_case_of`] or [`send_case_of`].
202/// - Must be called from a goroutine stack (not g0 or a bare OS thread).
203///   A `debug_assert` fires in debug builds if this is violated.
204///
205/// This function is intended only for use by the `select!` macro.
206#[doc(hidden)]
207pub fn selectgo(cases: &mut [SCase], has_default: bool) -> (usize, bool) {
208    let n = cases.len();
209
210    // ── 1. Build pollorder (random permutation) ───────────────────────────────
211    let mut pollorder: Vec<usize> = (0..n).collect();
212    let mut rng = Lehmer::from_goid();
213    // Fisher-Yates shuffle.
214    for i in (1..n).rev() {
215        let j = rng.next_usize(i + 1);
216        pollorder.swap(i, j);
217    }
218
219    // ── 2. Build lockorder (sorted by channel address; dedup same channel) ────
220    let mut lockorder: Vec<usize> = (0..n).collect();
221    lockorder.sort_by_key(|&i| cases[i].chan_ptr as usize);
222    // Deduplicate consecutive equal channels so we don't double-lock.
223    lockorder.dedup_by_key(|&mut i| cases[i].chan_ptr as usize);
224
225    // ── 3. Acquire all locks ──────────────────────────────────────────────────
226    for &i in &lockorder {
227        unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
228    }
229
230    // ── 4. First pass: check each case in poll order ──────────────────────────
231    for &i in &pollorder {
232        let result = unsafe { (cases[i].try_fn)(cases[i].chan_ptr, cases[i].elem) };
233        match result {
234            TryResult::NotReady => continue,
235
236            TryResult::Done { ok } => {
237                // Buffer op completed under the locks; release all and return.
238                for &j in &lockorder {
239                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
240                }
241                return (i, ok);
242            }
243
244            TryResult::Handoff { gp, ok } => {
245                // Partner dequeued and set up; release locks, wake partner.
246                for &j in &lockorder {
247                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
248                }
249                unsafe { goready(gp) };
250                return (i, ok);
251            }
252
253            TryResult::ClosedSend => {
254                for &j in &lockorder {
255                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
256                }
257                panic!("send on closed channel");
258            }
259        }
260    }
261
262    // ── 5. Default case ───────────────────────────────────────────────────────
263    if has_default {
264        for &i in &lockorder {
265            unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
266        }
267        return (CASE_DEFAULT, false);
268    }
269
270    // ── 6. Blocking path: enqueue sudogs on all channels ─────────────────────
271    let gp = current_g();
272    debug_assert!(!gp.is_null(), "selectgo: called from g0");
273
274    for case in cases.iter_mut() {
275        let sg = acquire_sudog();
276        unsafe {
277            (*sg).g         = gp;
278            (*sg).elem      = case.elem;
279            (*sg).is_select = true;
280            (*sg).success   = false;
281            (*sg).c         = case.chan_ptr as *mut u8;
282        }
283        case.sg = sg;
284        unsafe { (case.enqueue_fn)(case.chan_ptr, sg) };
285    }
286
287    // Reset selectdone so this goroutine can be claimed by exactly one case.
288    unsafe { (*gp).selectdone.store(0, Ordering::Release) };
289    unsafe { (*gp).param = ptr::null_mut() };
290
291    // ── 6c. Release all locks and park ────────────────────────────────────────
292    for &i in &lockorder {
293        unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
294    }
295
296    gopark(WaitReason::Select);
297
298    // ── 7. Woken: find winner, clean up losers ────────────────────────────────
299    //
300    // The winning channel operation stored the winning sudog in G.param.
301    let sg_winner = unsafe { (*gp).param as *mut Sudog };
302    unsafe { (*gp).param = ptr::null_mut() };
303    let ok = unsafe { (*sg_winner).success };
304
305    // Identify which case won.
306    let winner = cases
307        .iter()
308        .position(|c| c.sg == sg_winner)
309        .expect("selectgo: winning sudog not found in cases");
310
311    // 7a. Re-acquire all locks.
312    for &i in &lockorder {
313        unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
314    }
315
316    // 7b. Dequeue all losing sudogs from their channels.
317    for (i, case) in cases.iter_mut().enumerate() {
318        if i == winner { continue; }
319        let sg = case.sg;
320        unsafe { (case.dequeue_fn)(case.chan_ptr, sg) };
321    }
322
323    // 7c. Release all locks.
324    for &i in &lockorder {
325        unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
326    }
327
328    // 7d. Release all sudogs back to the pool.
329    for case in cases.iter_mut() {
330        let sg = case.sg;
331        case.sg = ptr::null_mut();
332        unsafe {
333            (*sg).g    = ptr::null_mut();
334            (*sg).elem = ptr::null_mut();
335            (*sg).c    = ptr::null_mut();
336            release_sudog(sg);
337        }
338    }
339
340    (winner, ok)
341}
342
343// ---------------------------------------------------------------------------
344// Generic vtable functions — monomorphised for each T at the call site
345// ---------------------------------------------------------------------------
346
347pub(crate) unsafe fn lock_chan<T>(p: *const ()) {
348    // Suppress SIGURG-driven async preemption while the channel spinlock is
349    // held — same rationale as `LockGuard::new`.  `selectgo` may hold several
350    // chan locks at once; each `lock_chan`/`unlock_chan` pair bumps and then
351    // decrements `m.locks`, so the counter is back to zero once all locks
352    // are released.
353    std::mem::forget(crate::runtime::m::m_lock());
354    (*(p as *const Hchan<T>)).mutex.lock();
355}
356
357pub(crate) unsafe fn unlock_chan<T>(p: *const ()) {
358    (*(p as *const Hchan<T>)).mutex.unlock();
359    // Manual `m.locks -= 1` to match the `mem::forget`ed guard in
360    // `lock_chan`.  We avoid constructing/dropping an MLockGuard here
361    // because the lock/unlock are split across two separate functions.
362    let mp = crate::runtime::m::current_m();
363    if !mp.is_null() { (*mp).locks -= 1; }
364}
365
366pub(crate) unsafe fn try_send_chan<T: Send + 'static>(
367    p: *const (),
368    elem: *mut u8,
369) -> TryResult {
370    let hchan = &*(p as *const Hchan<T>);
371    let state = &mut *hchan.state.get();
372
373    if state.closed {
374        return TryResult::ClosedSend;
375    }
376
377    // Waiting receiver?
378    let recv_sg = state.recvq.dequeue();
379    if !recv_sg.is_null() {
380        let gp = (*recv_sg).g;
381        // elem is *mut ManuallyDrop<T> (send slot); recv_sg.elem is *mut Option<T>.
382        let ep = (*recv_sg).elem as *mut Option<T>;
383        if !ep.is_null() {
384            *ep = Some(ptr::read(elem as *const T));
385        }
386        (*recv_sg).success = true;
387        (*gp).param        = recv_sg as *mut u8;
388        return TryResult::Handoff { gp, ok: true };
389    }
390
391    // Buffer space?
392    if state.buf.len() < state.cap {
393        state.buf.push_back(ptr::read(elem as *const T));
394        return TryResult::Done { ok: true };
395    }
396
397    TryResult::NotReady
398}
399
400pub(crate) unsafe fn try_recv_chan<T: Send + 'static>(
401    p: *const (),
402    elem: *mut u8, // *mut Option<T>
403) -> TryResult {
404    let hchan = &*(p as *const Hchan<T>);
405    let state = &mut *hchan.state.get();
406
407    // Waiting sender?
408    let send_sg = state.sendq.dequeue();
409    if !send_sg.is_null() {
410        let gp    = (*send_sg).g;
411        // send_sg.elem is *mut ManuallyDrop<T>; use ManuallyDrop explicitly so
412        // Box::from_raw for the boxed path does not run T's destructor.
413        let ep    = (*send_sg).elem as *mut ManuallyDrop<T>;
414        let boxed = (*send_sg).boxed_elem;
415        let val = if state.cap == 0 {
416            let v = ManuallyDrop::into_inner(ptr::read(ep));
417            if boxed { let _ = Box::from_raw(ep); }
418            (*send_sg).elem = ptr::null_mut();
419            v
420        } else {
421            let head = state.buf.pop_front().unwrap();
422            let sv   = ManuallyDrop::into_inner(ptr::read(ep));
423            if boxed { let _ = Box::from_raw(ep); }
424            (*send_sg).elem = ptr::null_mut();
425            state.buf.push_back(sv);
426            head
427        };
428        *(elem as *mut Option<T>) = Some(val);
429        (*send_sg).success = true;
430        (*gp).param        = send_sg as *mut u8;
431        return TryResult::Handoff { gp, ok: true };
432    }
433
434    // Buffer data?
435    if !state.buf.is_empty() {
436        let val = state.buf.pop_front().unwrap();
437        *(elem as *mut Option<T>) = Some(val);
438        return TryResult::Done { ok: true };
439    }
440
441    // Closed and empty → elem stays None; caller checks ok=false.
442    if state.closed {
443        return TryResult::Done { ok: false };
444    }
445
446    TryResult::NotReady
447}
448
449pub(crate) unsafe fn enqueue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
450    let hchan = &*(p as *const Hchan<T>);
451    (*hchan.state.get()).sendq.enqueue(sg);
452}
453
454pub(crate) unsafe fn enqueue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
455    let hchan = &*(p as *const Hchan<T>);
456    (*hchan.state.get()).recvq.enqueue(sg);
457}
458
459pub(crate) unsafe fn dequeue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
460    let hchan = &*(p as *const Hchan<T>);
461    (*hchan.state.get()).sendq.dequeue_sudog(sg);
462}
463
464pub(crate) unsafe fn dequeue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
465    let hchan = &*(p as *const Hchan<T>);
466    (*hchan.state.get()).recvq.dequeue_sudog(sg);
467}
468
469// ---------------------------------------------------------------------------
470// Public factory functions — used by the select! macro
471// ---------------------------------------------------------------------------
472
473/// Build a receive [`SCase`] for use in [`selectgo`].
474///
475/// `slot` must point to an `Option<T>` initialised to `None` that outlives the
476/// `selectgo` call.  On a successful receive (`ok = true`) the slot is
477/// `Some(value)`; on `ok = false` (channel closed) the slot remains `None`.
478///
479/// Called by the `select!` macro; not intended for direct use.
480#[doc(hidden)]
481pub fn recv_case_of<T: Send + 'static>(rx: &Receiver<T>, slot: *mut Option<T>) -> SCase {
482    SCase {
483        chan_ptr:    Arc::as_ptr(rx.hchan()) as *const (),
484        sg:          ptr::null_mut(),
485        elem:        slot as *mut u8,
486        lock_fn:     lock_chan::<T>,
487        unlock_fn:   unlock_chan::<T>,
488        try_fn:      try_recv_chan::<T>,
489        enqueue_fn:  enqueue_recv_chan::<T>,
490        dequeue_fn:  dequeue_recv_chan::<T>,
491    }
492}
493
494/// Build a send [`SCase`] for use in [`selectgo`].
495///
496/// `val` must point to a `ManuallyDrop<T>` that outlives the `selectgo` call.
497/// If the case wins, the value is moved into the channel and the caller must
498/// **not** drop `*val`.  If the case loses, the caller must call
499/// `ManuallyDrop::drop(val)` to avoid a leak.
500///
501/// Called by the `select!` macro; not intended for direct use.
502#[doc(hidden)]
503pub fn send_case_of<T: Send + 'static>(tx: &Sender<T>, val: *mut ManuallyDrop<T>) -> SCase {
504    SCase {
505        chan_ptr:    Arc::as_ptr(tx.hchan()) as *const (),
506        sg:          ptr::null_mut(),
507        elem:        val as *mut u8,
508        lock_fn:     lock_chan::<T>,
509        unlock_fn:   unlock_chan::<T>,
510        try_fn:      try_send_chan::<T>,
511        enqueue_fn:  enqueue_send_chan::<T>,
512        dequeue_fn:  dequeue_send_chan::<T>,
513    }
514}
515
516// ---------------------------------------------------------------------------
517// Tests
518// ---------------------------------------------------------------------------
519
520#[cfg(all(test, not(loom)))]
521mod tests {
522    use super::*;
523    use crate::chan::{chan, Hchan};
524    use crate::runtime::sudog::Sudog;
525    use crate::runtime::sched::run_impl;
526    use std::ptr;
527    use std::sync::atomic::{AtomicI32, Ordering};
528    use std::sync::Arc;
529
530    // ─── vtable helpers for Hchan<i32> ────────────────────────────────────────
531
532    unsafe fn lock_i32(p: *const ()) {
533        (*(p as *const Hchan<i32>)).mutex.lock();
534    }
535    unsafe fn unlock_i32(p: *const ()) {
536        unsafe { (*(p as *const Hchan<i32>)).mutex.unlock() };
537    }
538
539    /// try_fn for a **send** case on `Hchan<i32>`.
540    ///
541    /// `elem` points to a `i32` to send.  Checks recvq and buffer.
542    unsafe fn try_send_i32(p: *const (), elem: *mut u8) -> TryResult {
543        let hchan = &*(p as *const Hchan<i32>);
544        // SAFETY: caller holds the channel lock.
545        let state = &mut *hchan.state.get();
546
547        if state.closed {
548            return TryResult::ClosedSend;
549        }
550
551        // Waiting receiver?
552        let recv_sg = state.recvq.dequeue();
553        if !recv_sg.is_null() {
554            let gp  = (*recv_sg).g;
555            let ep  = (*recv_sg).elem as *mut Option<i32>;
556            if !ep.is_null() {
557                *ep = Some(ptr::read(elem as *const i32));
558            }
559            (*recv_sg).success = true;
560            (*gp).param        = recv_sg as *mut u8;
561            return TryResult::Handoff { gp, ok: true };
562        }
563
564        // Buffer space?
565        if state.buf.len() < state.cap {
566            state.buf.push_back(ptr::read(elem as *const i32));
567            return TryResult::Done { ok: true };
568        }
569
570        TryResult::NotReady
571    }
572
573    /// try_fn for a **recv** case on `Hchan<i32>`.
574    ///
575    /// `elem` points to an `Option<i32>` output slot (initialised to `None`).
576    unsafe fn try_recv_i32(p: *const (), elem: *mut u8) -> TryResult {
577        let hchan = &*(p as *const Hchan<i32>);
578        let state = &mut *hchan.state.get();
579
580        // Waiting sender?
581        let send_sg = state.sendq.dequeue();
582        if !send_sg.is_null() {
583            let gp    = (*send_sg).g;
584            let ep    = (*send_sg).elem as *mut ManuallyDrop<i32>;
585            let boxed = (*send_sg).boxed_elem;
586            let val = if state.cap == 0 {
587                let v = ManuallyDrop::into_inner(ptr::read(ep));
588                if boxed { let _ = Box::from_raw(ep); }
589                (*send_sg).elem = ptr::null_mut();
590                v
591            } else {
592                let head = state.buf.pop_front().unwrap();
593                let sv   = ManuallyDrop::into_inner(ptr::read(ep));
594                if boxed { let _ = Box::from_raw(ep); }
595                (*send_sg).elem = ptr::null_mut();
596                state.buf.push_back(sv);
597                head
598            };
599            *(elem as *mut Option<i32>) = Some(val);
600            (*send_sg).success = true;
601            (*gp).param        = send_sg as *mut u8;
602            return TryResult::Handoff { gp, ok: true };
603        }
604
605        // Buffer has data?
606        if !state.buf.is_empty() {
607            let val = state.buf.pop_front().unwrap();
608            *(elem as *mut Option<i32>) = Some(val);
609            return TryResult::Done { ok: true };
610        }
611
612        // Closed and empty → elem stays None.
613        if state.closed {
614            return TryResult::Done { ok: false };
615        }
616
617        TryResult::NotReady
618    }
619
620    unsafe fn enqueue_send_i32(p: *const (), sg: *mut Sudog) {
621        let hchan = &*(p as *const Hchan<i32>);
622        (*hchan.state.get()).sendq.enqueue(sg);
623    }
624    unsafe fn enqueue_recv_i32(p: *const (), sg: *mut Sudog) {
625        let hchan = &*(p as *const Hchan<i32>);
626        (*hchan.state.get()).recvq.enqueue(sg);
627    }
628    unsafe fn dequeue_send_sg_i32(p: *const (), sg: *mut Sudog) {
629        let hchan = &*(p as *const Hchan<i32>);
630        (*hchan.state.get()).sendq.dequeue_sudog(sg);
631    }
632    unsafe fn dequeue_recv_sg_i32(p: *const (), sg: *mut Sudog) {
633        let hchan = &*(p as *const Hchan<i32>);
634        (*hchan.state.get()).recvq.dequeue_sudog(sg);
635    }
636
637    /// Build an `SCase` for a buffered-send of `val` on channel `h`.
638    fn send_case(h: &Arc<Hchan<i32>>, val: &mut i32) -> SCase {
639        SCase {
640            chan_ptr:   Arc::as_ptr(h) as *const (),
641            sg:        ptr::null_mut(),
642            elem:      val as *mut i32 as *mut u8,
643            lock_fn:   lock_i32,
644            unlock_fn: unlock_i32,
645            try_fn:    try_send_i32,
646            enqueue_fn: enqueue_send_i32,
647            dequeue_fn: dequeue_send_sg_i32,
648        }
649    }
650
651    /// Build an `SCase` for a recv on channel `h`, output into `slot`.
652    fn recv_case(h: &Arc<Hchan<i32>>, slot: &mut Option<i32>) -> SCase {
653        SCase {
654            chan_ptr:   Arc::as_ptr(h) as *const (),
655            sg:        ptr::null_mut(),
656            elem:      slot as *mut Option<i32> as *mut u8,
657            lock_fn:   lock_i32,
658            unlock_fn: unlock_i32,
659            try_fn:    try_recv_i32,
660            enqueue_fn: enqueue_recv_i32,
661            dequeue_fn: dequeue_recv_sg_i32,
662        }
663    }
664
665    // ── Fast-path tests (no goroutine park) ───────────────────────────────────
666
667    /// select { rx.recv() => ... ; default } on a buffered channel with data.
668    #[test]
669    fn fast_recv_buffered() {
670        run_impl(|| {
671            let (tx, rx) = chan::<i32>(4);
672            tx.send(42);
673
674            let mut slot: Option<i32> = None;
675            let mut cases = [recv_case(rx.hchan(), &mut slot)];
676            let (idx, ok) = selectgo(&mut cases, true);
677
678            assert_eq!(idx, 0, "should pick recv case");
679            assert!(ok,        "should be ok (not closed)");
680            assert_eq!(slot.unwrap(), 42);
681        });
682    }
683
684    /// select { tx.send(v) => ... ; default } on a channel with buffer space.
685    #[test]
686    fn fast_send_buffered() {
687        run_impl(|| {
688            let (tx, rx) = chan::<i32>(4);
689
690            let mut val = 99_i32;
691            let mut cases = [send_case(tx.hchan(), &mut val)];
692            let (idx, ok) = selectgo(&mut cases, true);
693
694            assert_eq!(idx, 0);
695            assert!(ok, "buffered send completes with ok=true");
696            assert_eq!(rx.recv(), Some(99));
697        });
698    }
699
700    /// select { ... ; default } when no case is ready → default taken.
701    #[test]
702    fn default_taken_when_not_ready() {
703        run_impl(|| {
704            let (_tx, rx) = chan::<i32>(0);
705
706            let mut slot: Option<i32> = None;
707            let mut cases = [recv_case(rx.hchan(), &mut slot)];
708            let (idx, ok) = selectgo(&mut cases, true);
709
710            assert_eq!(idx, CASE_DEFAULT);
711            assert!(!ok);
712        });
713    }
714
715    /// select recv on closed+empty channel returns ok=false.
716    #[test]
717    fn recv_closed_empty() {
718        run_impl(|| {
719            let (tx, rx) = chan::<i32>(0);
720            tx.close();
721
722            let mut slot: Option<i32> = None;
723            let mut cases = [recv_case(rx.hchan(), &mut slot)];
724            let (idx, ok) = selectgo(&mut cases, false);
725
726            assert_eq!(idx, 0);
727            assert!(!ok, "recv from closed returns ok=false");
728            assert!(slot.is_none(), "closed recv slot must stay None");
729        });
730    }
731
732    // ── Multi-case selection ──────────────────────────────────────────────────
733
734    /// Two recv cases; only one channel has data — that case wins.
735    #[test]
736    fn multi_case_first_ready_wins() {
737        run_impl(|| {
738            let (tx1, rx1) = chan::<i32>(1);
739            let (_tx2, rx2) = chan::<i32>(1);
740
741            tx1.send(7);
742
743            let mut s1: Option<i32> = None;
744            let mut s2: Option<i32> = None;
745            let mut cases = [
746                recv_case(rx1.hchan(), &mut s1),
747                recv_case(rx2.hchan(), &mut s2),
748            ];
749            let (idx, ok) = selectgo(&mut cases, false);
750
751            assert_eq!(idx, 0);
752            assert!(ok);
753            assert_eq!(s1.unwrap(), 7);
754        });
755    }
756
757    // ── Blocking path tests (goroutine park/unpark) ───────────────────────────
758
759    /// Goroutine blocks on select recv, then a sender unblocks it.
760    #[test]
761    fn blocking_recv_unblocked_by_send() {
762        use crate::runtime::sched::spawn_goroutine;
763
764        let result = Arc::new(AtomicI32::new(-1));
765        let result2 = Arc::clone(&result);
766
767        run_impl(move || {
768            let (tx, rx) = chan::<i32>(0);
769
770            spawn_goroutine(move || {
771                // Sender: wait a bit, then send.
772                crate::gosched();
773                tx.send(55);
774            });
775
776            let mut slot: Option<i32> = None;
777            let mut cases = [recv_case(rx.hchan(), &mut slot)];
778            // No default → will block.
779            let (idx, ok) = selectgo(&mut cases, false);
780
781            assert_eq!(idx, 0);
782            assert!(ok);
783            result2.store(slot.unwrap(), Ordering::Relaxed);
784        });
785
786        assert_eq!(result.load(Ordering::Acquire), 55);
787    }
788
789    /// Goroutine blocks on select send, then a receiver unblocks it.
790    #[test]
791    fn blocking_send_unblocked_by_recv() {
792        use crate::runtime::sched::spawn_goroutine;
793
794        run_impl(|| {
795            let (tx, rx) = chan::<i32>(0);
796
797            spawn_goroutine(move || {
798                crate::gosched();
799                // Consume the value the select sends.
800                let _ = rx.recv();
801            });
802
803            let mut val = 77_i32;
804            let mut cases = [send_case(tx.hchan(), &mut val)];
805            let (idx, _ok) = selectgo(&mut cases, false);
806
807            assert_eq!(idx, 0);
808        });
809    }
810
811    /// Two goroutines racing on the same channel; exactly one wins via select.
812    #[test]
813    fn select_race_one_winner() {
814        use crate::runtime::sched::spawn_goroutine;
815
816        let wins = Arc::new(AtomicI32::new(0));
817        let wins2 = Arc::clone(&wins);
818        let wins3 = Arc::clone(&wins);
819
820        run_impl(move || {
821            let (tx, rx) = chan::<i32>(1);
822            tx.send(1); // one value in the buffer
823
824            spawn_goroutine({
825                let wins = Arc::clone(&wins2);
826                let rx = rx.clone();
827                move || {
828                    let mut slot: Option<i32> = None;
829                    let mut cases = [recv_case(rx.hchan(), &mut slot)];
830                    let (idx, ok) = selectgo(&mut cases, true);
831                    if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
832                }
833            });
834
835            spawn_goroutine({
836                let wins = Arc::clone(&wins3);
837                let rx = rx.clone();
838                move || {
839                    let mut slot: Option<i32> = None;
840                    let mut cases = [recv_case(rx.hchan(), &mut slot)];
841                    let (idx, ok) = selectgo(&mut cases, true);
842                    if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
843                }
844            });
845
846            // Give goroutines time to race.
847            for _ in 0..200 { crate::gosched(); }
848        });
849
850        // Exactly one goroutine should have received the value.
851        assert_eq!(wins.load(Ordering::Acquire), 1);
852    }
853}