Skip to main content

go_lib/
select.rs

1// SPDX-License-Identifier: Apache-2.0
2//! `selectgo` — the runtime heart of `select { }`.
3//!
4//! Ported from `runtime/select.go`.
5//!
6//! ## How it works
7//!
8//! `selectgo` receives a slice of [`SCase`]s and an optional `has_default`
9//! flag and picks the first case that can proceed without blocking.
10//!
11//! ```text
12//! 1. Build pollorder  — a random permutation of case indices (fairness).
13//! 2. Build lockorder  — case indices sorted by channel address (deadlock prevention).
14//! 3. Acquire all channel locks in lockorder.
15//! 4. First pass (pollorder): check each case for immediate readiness.
16//!    – buffer op:    perform it, release all locks, return winner.
17//!    – direct handoff (partner waiting): dequeue partner's sudog, perform op,
18//!      release all locks, call goready(partner), return winner.
19//!    – send on closed: release all locks, panic.
20//! 5. If has_default: release all locks, return (CASE_DEFAULT, false).
21//! 6. Blocking path:
22//!    a. For every case, allocate a sudog (is_select=true) and enqueue it.
23//!    b. Reset G.selectdone to 0 and G.param to null.
24//!    c. gopark_commit(Select) — park while still holding ALL channel locks;
25//!       they are released on g0 only after the goroutine reaches GWAITING
26//!       (commit-park protocol, closes the SIGURG lost-wakeup window).
27//! 7. On wakeup (winner wrote G.param = winning sudog):
28//!    a. Acquire all locks in lockorder.
29//!    b. Dequeue all *losing* sudogs (dequeue_sudog is a no-op if a racing
30//!       channel op already removed them).
31//!    c. Release all locks.
32//!    d. Release all sudogs back to the free list.
33//!    e. Return (winner_index, ok).
34//! ```
35//!
36//! ## Type erasure
37//!
38//! Channels are generic (`Hchan<T>`) but `selectgo` must operate over a
39//! heterogeneous set of them.  Each [`SCase`] carries four function pointers
40//! that are monomorphised at the call site (by the `select!` macro):
41//!
42//! | pointer       | purpose                                        |
43//! |---------------|------------------------------------------------|
44//! | `lock_fn`     | acquire the channel's `RawMutex`               |
45//! | `unlock_fn`   | release the channel's `RawMutex`               |
46//! | `try_fn`      | attempt the channel op while all locks held    |
47//! | `enqueue_fn`  | enqueue a sudog on the channel's wait queue    |
48//! | `dequeue_fn`  | remove a specific sudog (O(1) cleanup)         |
49//!
50//! `chan_ptr` is the type-erased `*const Hchan<T>` used as the channel
51//! identity for deduplication and address-ordered locking.
52//!
53//! ## Sentinel index
54//!
55//! `selectgo` returns `CASE_DEFAULT` (`usize::MAX`) when the default case is
56//! taken.  Channel cases use their 0-based index within the slice.
57
58use std::mem::ManuallyDrop;
59use std::ptr;
60use std::sync::atomic::Ordering;
61use std::sync::Arc;
62
63use crate::chan::{Hchan, Receiver, Sender};
64use crate::runtime::g::{current_g, G, WaitReason};
65use crate::runtime::park::{gopark_commit, goready};
66use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog};
67
68// ---------------------------------------------------------------------------
69// Public constants
70// ---------------------------------------------------------------------------
71
72/// Return value from [`selectgo`] when the default case is taken.
73pub const CASE_DEFAULT: usize = usize::MAX;
74
75// ---------------------------------------------------------------------------
76// TryResult — outcome of a single case's fast-path attempt
77// ---------------------------------------------------------------------------
78
79/// The result of attempting a channel case while all locks are held.
80#[derive(Debug)]
81pub(crate) enum TryResult {
82    /// Case is not immediately satisfiable.
83    NotReady,
84
85    /// Case completed via a buffer read/write.
86    /// `ok`: true for a normal value, false for a closed-channel receive.
87    Done { ok: bool },
88
89    /// Case completed via a direct goroutine-to-goroutine handoff.
90    /// The partner goroutine has been set up (`param` set, `success` set) but
91    /// not yet made runnable.  Caller must call `goready(gp)` after releasing
92    /// all locks.
93    Handoff { gp: *mut G, ok: bool },
94
95    /// Send attempted on a closed channel.  Caller must release all locks and
96    /// then `panic!("send on closed channel")`.
97    ClosedSend,
98}
99
100// SAFETY: TryResult is only ever used in a single goroutine between lock
101// acquire and lock release; the raw *mut G is not shared across threads.
102unsafe impl Send for TryResult {}
103
104// ---------------------------------------------------------------------------
105// SCase — one arm of a select statement
106// ---------------------------------------------------------------------------
107
108/// One arm of a `select` statement (send, receive, or default).
109///
110/// Constructed by [`recv_case_of`] / [`send_case_of`]; do not build directly.
111#[doc(hidden)]
112pub struct SCase {
113    /// Type-erased `*const Hchan<T>`.  Used as the channel identity for
114    /// deduplication and address-ordered locking.  `null` for a default arm.
115    pub(crate) chan_ptr: *const (),
116
117    /// The sudog enrolled on this channel while the goroutine is parked.
118    /// Set by `selectgo` in the blocking path; `null` for default and
119    /// fast-path returns.
120    pub(crate) sg: *mut Sudog,
121
122    /// Type-erased value pointer.
123    ///
124    /// **Send**: `*mut ManuallyDrop<T>` — the value to send (read by the fn pointers).
125    /// **Recv**: `*mut Option<T>` — output slot; written as `Some(val)` on a
126    ///           successful receive, left as `None` when the channel is closed.
127    /// **Default**: `null`.
128    pub(crate) elem: *mut u8,
129
130    // ─── vtable — filled in by select! macro ──────────────────────────────
131
132    /// Acquire the channel's lock.
133    pub(crate) lock_fn: unsafe fn(*const ()),
134
135    /// Release the channel's lock.
136    pub(crate) unlock_fn: unsafe fn(*const ()),
137
138    /// Try the channel operation while all locks are held.
139    ///
140    /// Signature: `(chan_ptr, elem) -> TryResult`
141    ///
142    /// For a send case, `elem` is `*mut ManuallyDrop<T>` (the value to send).
143    /// For a recv case, `elem` is `*mut Option<T>` (the output slot).
144    pub(crate) try_fn: unsafe fn(*const (), *mut u8) -> TryResult,
145
146    /// Enqueue `sg` on the channel's sendq or recvq (under the lock).
147    pub(crate) enqueue_fn: unsafe fn(*const (), *mut Sudog),
148
149    /// Remove `sg` from the channel's sendq or recvq (under the lock).
150    /// No-op if `sg` was already removed by a racing channel operation.
151    pub(crate) dequeue_fn: unsafe fn(*const (), *mut Sudog),
152}
153
154// SAFETY: SCase is always used within a single goroutine context; the raw
155// pointers are only shared via the scheduler under goroutine-exclusion.
156unsafe impl Send for SCase {}
157
158// ---------------------------------------------------------------------------
159// Lehmer RNG — tiny PRNG for poll-order shuffling
160// ---------------------------------------------------------------------------
161
162/// A Lehmer (Park–Miller) multiplicative congruential PRNG.
163///
164/// Used only to produce the random poll order; cryptographic quality is
165/// not required.  Seeded from the current goroutine's `goid`.
166struct Lehmer(u64);
167
168impl Lehmer {
169    fn from_goid() -> Self {
170        let gp = current_g();
171        // SAFETY: gp is only dereferenced after the null check.
172        let goid = if gp.is_null() { 1 } else { (unsafe { (*gp).goid }) | 1 };
173        Lehmer(goid | 1) // must be odd and non-zero
174    }
175
176    /// Return a pseudo-random value in `[0, n)`.
177    fn next_usize(&mut self, n: usize) -> usize {
178        // 64-bit Lehmer with multiplier from Knuth TAOCP Vol 2 §3.3.4.
179        self.0 = self.0.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
180        ((self.0 >> 33) as usize) % n
181    }
182}
183
184// ---------------------------------------------------------------------------
185// Commit-park unlock shim
186// ---------------------------------------------------------------------------
187
188/// Everything `unlock_select_chans` needs to release every channel lock held
189/// by a parked `selectgo`, from g0, after the goroutine is `GWAITING`.
190///
191/// Lives on the selecting goroutine's stack across the park.  That stack stays
192/// mapped while the goroutine is `GWAITING` (and until `park_fn` finishes and
193/// releases the locks here), so the raw pointers remain valid for the single
194/// shim call.
195struct SelectParkCtx {
196    /// `cases.as_ptr()` — the select's case slice base.
197    cases:     *const SCase,
198    /// `cases.len()`.
199    n:         usize,
200    /// `lockorder.as_ptr()` — address-sorted, deduped case indices.
201    lockorder: *const usize,
202    /// `lockorder.len()`.
203    nlock:     usize,
204}
205
206/// `gopark_commit` unlock shim for `selectgo`: release ALL of the select's
207/// channel locks (in lockorder) from g0 after the parking goroutine has
208/// reached `GWAITING`.  Each `unlock_fn` is `unlock_chan`, which also drops
209/// the `m.locks` increment that the matching `lock_chan` left held.
210///
211/// # Safety
212/// `arg` must point to a live [`SelectParkCtx`] whose channel locks are all
213/// held by the parking goroutine.
214unsafe fn unlock_select_chans(arg: *mut u8) {
215    let ctx       = unsafe { &*(arg as *const SelectParkCtx) };
216    let cases     = unsafe { std::slice::from_raw_parts(ctx.cases, ctx.n) };
217    let lockorder = unsafe { std::slice::from_raw_parts(ctx.lockorder, ctx.nlock) };
218    for &i in lockorder {
219        unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
220    }
221}
222
223// ---------------------------------------------------------------------------
224// selectgo
225// ---------------------------------------------------------------------------
226
227/// Run a `select` over the given cases.
228///
229/// `cases` must contain only **channel** cases (send or receive); pass
230/// `has_default = true` if the select has a `default` arm.
231///
232/// Returns `(chosen_index, received_ok)` where:
233/// - `chosen_index` is the 0-based index into `cases`, or [`CASE_DEFAULT`] if
234///   the default arm was taken.
235/// - `received_ok` is `true` for a normal channel recv, `false` if the
236///   channel was closed (and the receive wrote `None` into the slot).  Always
237///   `false` for send/default arms.
238///
239/// # Preconditions
240///
241/// - All `SCase` values must be created by [`recv_case_of`] or [`send_case_of`].
242/// - Must be called from a goroutine stack (not g0 or a bare OS thread).
243///   A `debug_assert` fires in debug builds if this is violated.
244///
245/// This function is intended only for use by the `select!` macro.
246#[doc(hidden)]
247pub fn selectgo(cases: &mut [SCase], has_default: bool) -> (usize, bool) {
248    let n = cases.len();
249
250    // ── 1. Build pollorder (random permutation) ───────────────────────────────
251    let mut pollorder: Vec<usize> = (0..n).collect();
252    let mut rng = Lehmer::from_goid();
253    // Fisher-Yates shuffle.
254    for i in (1..n).rev() {
255        let j = rng.next_usize(i + 1);
256        pollorder.swap(i, j);
257    }
258
259    // ── 2. Build lockorder (sorted by channel address; dedup same channel) ────
260    let mut lockorder: Vec<usize> = (0..n).collect();
261    lockorder.sort_by_key(|&i| cases[i].chan_ptr as usize);
262    // Deduplicate consecutive equal channels so we don't double-lock.
263    lockorder.dedup_by_key(|&mut i| cases[i].chan_ptr as usize);
264
265    // ── 3. Acquire all locks ──────────────────────────────────────────────────
266    for &i in &lockorder {
267        unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
268    }
269
270    // ── 4. First pass: check each case in poll order ──────────────────────────
271    //
272    // `try_fn` dequeues a peer's sudog and reads/writes its `elem` (which, for
273    // a select peer, points into that peer's `selectgo` stack frame) entirely
274    // under the channel locks acquired in step 3.  Those locks are not
275    // released until the `unlock_fn` loop below, *after* the elem access — and
276    // the peer cannot resume and unwind that `selectgo` frame until it dequeues
277    // its own sudog, which needs the same channel lock.  So the peer's stack
278    // stays valid while we hold the lock.  The G descriptor is reused via the
279    // gFree pool (never freed while parked), so the `gp` we hand to `goready`
280    // is always valid.
281    for &i in &pollorder {
282        let result = unsafe { (cases[i].try_fn)(cases[i].chan_ptr, cases[i].elem) };
283        match result {
284            TryResult::NotReady => continue,
285
286            TryResult::Done { ok } => {
287                // Buffer op completed under the locks; release all and return.
288                for &j in &lockorder {
289                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
290                }
291                return (i, ok);
292            }
293
294            TryResult::Handoff { gp, ok } => {
295                // Partner dequeued and set up; release locks, wake partner.
296                for &j in &lockorder {
297                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
298                }
299                unsafe { goready(gp) };
300                return (i, ok);
301            }
302
303            TryResult::ClosedSend => {
304                for &j in &lockorder {
305                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
306                }
307                panic!("send on closed channel");
308            }
309        }
310    }
311
312    // ── 5. Default case ───────────────────────────────────────────────────────
313    if has_default {
314        for &i in &lockorder {
315            unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
316        }
317        return (CASE_DEFAULT, false);
318    }
319
320    // ── 6. Blocking path: enqueue sudogs on all channels ─────────────────────
321    let gp = current_g();
322    debug_assert!(!gp.is_null(), "selectgo: called from g0");
323
324    for case in cases.iter_mut() {
325        let sg = acquire_sudog();
326        unsafe {
327            (*sg).g         = gp;
328            (*sg).elem      = case.elem;
329            (*sg).is_select = true;
330            (*sg).success   = false;
331            (*sg).c         = case.chan_ptr as *mut u8;
332        }
333        case.sg = sg;
334        unsafe { (case.enqueue_fn)(case.chan_ptr, sg) };
335    }
336
337    // Reset selectdone so this goroutine can be claimed by exactly one case.
338    unsafe { (*gp).selectdone.store(0, Ordering::Release) };
339    unsafe { (*gp).param = ptr::null_mut() };
340
341    // ── 6c. Commit-park: hold all channel locks across the park ───────────────
342    //
343    // Releasing the locks *before* gopark (the old design) opened a
344    // lost-wakeup window: between the unlock loop and the park an async
345    // preemption (SIGURG) could move this selecting goroutine to
346    // GRUNNABLE+queued; a peer completing one of our cases would then call
347    // goready, see the non-GWAITING status and return early, so when we
348    // finally parked (GWAITING) nobody would wake us → hang.  Instead we keep
349    // every channel lock held across the `mcall` and release them all on g0,
350    // via `unlock_select_chans`, only after `park_fn` has committed us to
351    // GWAITING.  A peer cannot dequeue our sudog (that needs the channel
352    // lock) until the park is committed.
353    //
354    // m.locks accounting: each `lock_chan` left one `m.locks` increment held
355    // (it `mem::forget`s its MLockGuard), so `m.locks` is currently elevated
356    // by `lockorder.len()`; the shim's matching `unlock_chan` calls remove
357    // exactly those.  `gopark_commit`/`park_fn` additionally do one
358    // unconditional `m.locks -= 1` for the handoff, so we add one extra
359    // increment here for park_fn to balance.
360    let ctx = SelectParkCtx {
361        cases:     cases.as_ptr(),
362        n:         cases.len(),
363        lockorder: lockorder.as_ptr(),
364        nlock:     lockorder.len(),
365    };
366    std::mem::forget(crate::runtime::m::m_lock());
367    unsafe {
368        gopark_commit(
369            WaitReason::Select,
370            unlock_select_chans,
371            &ctx as *const SelectParkCtx as *mut u8,
372        );
373    }
374
375    // ── 7. Woken: find winner, clean up losers ────────────────────────────────
376    //
377    // The winning channel operation stored the winning sudog in G.param.
378    let sg_winner = unsafe { (*gp).param as *mut Sudog };
379    unsafe { (*gp).param = ptr::null_mut() };
380    let ok = unsafe { (*sg_winner).success };
381
382    // Identify which case won.
383    let winner = cases
384        .iter()
385        .position(|c| c.sg == sg_winner)
386        .expect("selectgo: winning sudog not found in cases");
387
388    // 7a. Re-acquire all locks.
389    for &i in &lockorder {
390        unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
391    }
392
393    // 7b. Dequeue all losing sudogs from their channels.
394    for (i, case) in cases.iter_mut().enumerate() {
395        if i == winner { continue; }
396        let sg = case.sg;
397        unsafe { (case.dequeue_fn)(case.chan_ptr, sg) };
398    }
399
400    // 7c. Release all locks.
401    for &i in &lockorder {
402        unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
403    }
404
405    // 7d. Release all sudogs back to the pool.
406    for case in cases.iter_mut() {
407        let sg = case.sg;
408        case.sg = ptr::null_mut();
409        unsafe {
410            (*sg).g    = ptr::null_mut();
411            (*sg).elem = ptr::null_mut();
412            (*sg).c    = ptr::null_mut();
413            release_sudog(sg);
414        }
415    }
416
417    (winner, ok)
418}
419
420// ---------------------------------------------------------------------------
421// Generic vtable functions — monomorphised for each T at the call site
422// ---------------------------------------------------------------------------
423
424pub(crate) unsafe fn lock_chan<T>(p: *const ()) {
425    // Suppress SIGURG-driven async preemption while the channel spinlock is
426    // held — same rationale as `LockGuard::new`.  `selectgo` may hold several
427    // chan locks at once; each `lock_chan`/`unlock_chan` pair bumps and then
428    // decrements `m.locks`, so the counter is back to zero once all locks
429    // are released.
430    std::mem::forget(crate::runtime::m::m_lock());
431    (*(p as *const Hchan<T>)).mutex.lock();
432}
433
434pub(crate) unsafe fn unlock_chan<T>(p: *const ()) {
435    (*(p as *const Hchan<T>)).mutex.unlock();
436    // Manual `m.locks -= 1` to match the `mem::forget`ed guard in
437    // `lock_chan`.  We avoid constructing/dropping an MLockGuard here
438    // because the lock/unlock are split across two separate functions.
439    let mp = crate::runtime::m::current_m();
440    if !mp.is_null() {
441        (*mp).locks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
442    }
443}
444
445pub(crate) unsafe fn try_send_chan<T: Send + 'static>(
446    p: *const (),
447    elem: *mut u8,
448) -> TryResult {
449    let hchan = &*(p as *const Hchan<T>);
450    let state = &mut *hchan.state.get();
451
452    if state.closed {
453        return TryResult::ClosedSend;
454    }
455
456    // Waiting receiver?
457    let recv_sg = state.recvq.dequeue();
458    if !recv_sg.is_null() {
459        let gp = (*recv_sg).g;
460        // elem is *mut ManuallyDrop<T> (send slot); recv_sg.elem is *mut Option<T>.
461        let ep = (*recv_sg).elem as *mut Option<T>;
462        if !ep.is_null() {
463            *ep = Some(ptr::read(elem as *const T));
464        }
465        (*recv_sg).success = true;
466        (*gp).param        = recv_sg as *mut u8;
467        return TryResult::Handoff { gp, ok: true };
468    }
469
470    // Buffer space?
471    if state.buf.len() < state.cap {
472        state.buf.push_back(ptr::read(elem as *const T));
473        return TryResult::Done { ok: true };
474    }
475
476    TryResult::NotReady
477}
478
479pub(crate) unsafe fn try_recv_chan<T: Send + 'static>(
480    p: *const (),
481    elem: *mut u8, // *mut Option<T>
482) -> TryResult {
483    let hchan = &*(p as *const Hchan<T>);
484    let state = &mut *hchan.state.get();
485
486    // Waiting sender?
487    let send_sg = state.sendq.dequeue();
488    if !send_sg.is_null() {
489        let gp    = (*send_sg).g;
490        // send_sg.elem is *mut ManuallyDrop<T>; use ManuallyDrop explicitly so
491        // Box::from_raw for the boxed path does not run T's destructor.
492        let ep    = (*send_sg).elem as *mut ManuallyDrop<T>;
493        let boxed = (*send_sg).boxed_elem;
494        let val = if state.cap == 0 {
495            let v = ManuallyDrop::into_inner(ptr::read(ep));
496            if boxed { let _ = Box::from_raw(ep); }
497            (*send_sg).elem = ptr::null_mut();
498            v
499        } else {
500            let head = state.buf.pop_front().unwrap();
501            let sv   = ManuallyDrop::into_inner(ptr::read(ep));
502            if boxed { let _ = Box::from_raw(ep); }
503            (*send_sg).elem = ptr::null_mut();
504            state.buf.push_back(sv);
505            head
506        };
507        *(elem as *mut Option<T>) = Some(val);
508        (*send_sg).success = true;
509        (*gp).param        = send_sg as *mut u8;
510        return TryResult::Handoff { gp, ok: true };
511    }
512
513    // Buffer data?
514    if !state.buf.is_empty() {
515        let val = state.buf.pop_front().unwrap();
516        *(elem as *mut Option<T>) = Some(val);
517        return TryResult::Done { ok: true };
518    }
519
520    // Closed and empty → elem stays None; caller checks ok=false.
521    if state.closed {
522        return TryResult::Done { ok: false };
523    }
524
525    TryResult::NotReady
526}
527
528pub(crate) unsafe fn enqueue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
529    let hchan = &*(p as *const Hchan<T>);
530    (*hchan.state.get()).sendq.enqueue(sg);
531}
532
533pub(crate) unsafe fn enqueue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
534    let hchan = &*(p as *const Hchan<T>);
535    (*hchan.state.get()).recvq.enqueue(sg);
536}
537
538pub(crate) unsafe fn dequeue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
539    let hchan = &*(p as *const Hchan<T>);
540    (*hchan.state.get()).sendq.dequeue_sudog(sg);
541}
542
543pub(crate) unsafe fn dequeue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
544    let hchan = &*(p as *const Hchan<T>);
545    (*hchan.state.get()).recvq.dequeue_sudog(sg);
546}
547
548// ---------------------------------------------------------------------------
549// Public factory functions — used by the select! macro
550// ---------------------------------------------------------------------------
551
552/// Build a receive [`SCase`] for use in [`selectgo`].
553///
554/// `slot` must point to an `Option<T>` initialised to `None` that outlives the
555/// `selectgo` call.  On a successful receive (`ok = true`) the slot is
556/// `Some(value)`; on `ok = false` (channel closed) the slot remains `None`.
557///
558/// Called by the `select!` macro; not intended for direct use.
559#[doc(hidden)]
560pub fn recv_case_of<T: Send + 'static>(rx: &Receiver<T>, slot: *mut Option<T>) -> SCase {
561    SCase {
562        chan_ptr:    Arc::as_ptr(rx.hchan()) as *const (),
563        sg:          ptr::null_mut(),
564        elem:        slot as *mut u8,
565        lock_fn:     lock_chan::<T>,
566        unlock_fn:   unlock_chan::<T>,
567        try_fn:      try_recv_chan::<T>,
568        enqueue_fn:  enqueue_recv_chan::<T>,
569        dequeue_fn:  dequeue_recv_chan::<T>,
570    }
571}
572
573/// Build a send [`SCase`] for use in [`selectgo`].
574///
575/// `val` must point to a `ManuallyDrop<T>` that outlives the `selectgo` call.
576/// If the case wins, the value is moved into the channel and the caller must
577/// **not** drop `*val`.  If the case loses, the caller must call
578/// `ManuallyDrop::drop(val)` to avoid a leak.
579///
580/// Called by the `select!` macro; not intended for direct use.
581#[doc(hidden)]
582pub fn send_case_of<T: Send + 'static>(tx: &Sender<T>, val: *mut ManuallyDrop<T>) -> SCase {
583    SCase {
584        chan_ptr:    Arc::as_ptr(tx.hchan()) as *const (),
585        sg:          ptr::null_mut(),
586        elem:        val as *mut u8,
587        lock_fn:     lock_chan::<T>,
588        unlock_fn:   unlock_chan::<T>,
589        try_fn:      try_send_chan::<T>,
590        enqueue_fn:  enqueue_send_chan::<T>,
591        dequeue_fn:  dequeue_send_chan::<T>,
592    }
593}
594
595// ---------------------------------------------------------------------------
596// Tests
597// ---------------------------------------------------------------------------
598
599#[cfg(all(test, not(loom)))]
600mod tests {
601    use super::*;
602    use crate::chan::{chan, Hchan};
603    use crate::runtime::sudog::Sudog;
604    use std::ptr;
605    use std::sync::atomic::{AtomicI32, Ordering};
606    use std::sync::Arc;
607
608    // ─── vtable helpers for Hchan<i32> ────────────────────────────────────────
609
610    unsafe fn lock_i32(p: *const ()) {
611        (*(p as *const Hchan<i32>)).mutex.lock();
612    }
613    unsafe fn unlock_i32(p: *const ()) {
614        unsafe { (*(p as *const Hchan<i32>)).mutex.unlock() };
615    }
616
617    /// try_fn for a **send** case on `Hchan<i32>`.
618    ///
619    /// `elem` points to a `i32` to send.  Checks recvq and buffer.
620    unsafe fn try_send_i32(p: *const (), elem: *mut u8) -> TryResult {
621        let hchan = &*(p as *const Hchan<i32>);
622        // SAFETY: caller holds the channel lock.
623        let state = &mut *hchan.state.get();
624
625        if state.closed {
626            return TryResult::ClosedSend;
627        }
628
629        // Waiting receiver?
630        let recv_sg = state.recvq.dequeue();
631        if !recv_sg.is_null() {
632            let gp  = (*recv_sg).g;
633            let ep  = (*recv_sg).elem as *mut Option<i32>;
634            if !ep.is_null() {
635                *ep = Some(ptr::read(elem as *const i32));
636            }
637            (*recv_sg).success = true;
638            (*gp).param        = recv_sg as *mut u8;
639            return TryResult::Handoff { gp, ok: true };
640        }
641
642        // Buffer space?
643        if state.buf.len() < state.cap {
644            state.buf.push_back(ptr::read(elem as *const i32));
645            return TryResult::Done { ok: true };
646        }
647
648        TryResult::NotReady
649    }
650
651    /// try_fn for a **recv** case on `Hchan<i32>`.
652    ///
653    /// `elem` points to an `Option<i32>` output slot (initialised to `None`).
654    unsafe fn try_recv_i32(p: *const (), elem: *mut u8) -> TryResult {
655        let hchan = &*(p as *const Hchan<i32>);
656        let state = &mut *hchan.state.get();
657
658        // Waiting sender?
659        let send_sg = state.sendq.dequeue();
660        if !send_sg.is_null() {
661            let gp    = (*send_sg).g;
662            let ep    = (*send_sg).elem as *mut ManuallyDrop<i32>;
663            let boxed = (*send_sg).boxed_elem;
664            let val = if state.cap == 0 {
665                let v = ManuallyDrop::into_inner(ptr::read(ep));
666                if boxed { let _ = Box::from_raw(ep); }
667                (*send_sg).elem = ptr::null_mut();
668                v
669            } else {
670                let head = state.buf.pop_front().unwrap();
671                let sv   = ManuallyDrop::into_inner(ptr::read(ep));
672                if boxed { let _ = Box::from_raw(ep); }
673                (*send_sg).elem = ptr::null_mut();
674                state.buf.push_back(sv);
675                head
676            };
677            *(elem as *mut Option<i32>) = Some(val);
678            (*send_sg).success = true;
679            (*gp).param        = send_sg as *mut u8;
680            return TryResult::Handoff { gp, ok: true };
681        }
682
683        // Buffer has data?
684        if !state.buf.is_empty() {
685            let val = state.buf.pop_front().unwrap();
686            *(elem as *mut Option<i32>) = Some(val);
687            return TryResult::Done { ok: true };
688        }
689
690        // Closed and empty → elem stays None.
691        if state.closed {
692            return TryResult::Done { ok: false };
693        }
694
695        TryResult::NotReady
696    }
697
698    unsafe fn enqueue_send_i32(p: *const (), sg: *mut Sudog) {
699        let hchan = &*(p as *const Hchan<i32>);
700        (*hchan.state.get()).sendq.enqueue(sg);
701    }
702    unsafe fn enqueue_recv_i32(p: *const (), sg: *mut Sudog) {
703        let hchan = &*(p as *const Hchan<i32>);
704        (*hchan.state.get()).recvq.enqueue(sg);
705    }
706    unsafe fn dequeue_send_sg_i32(p: *const (), sg: *mut Sudog) {
707        let hchan = &*(p as *const Hchan<i32>);
708        (*hchan.state.get()).sendq.dequeue_sudog(sg);
709    }
710    unsafe fn dequeue_recv_sg_i32(p: *const (), sg: *mut Sudog) {
711        let hchan = &*(p as *const Hchan<i32>);
712        (*hchan.state.get()).recvq.dequeue_sudog(sg);
713    }
714
715    /// Build an `SCase` for a buffered-send of `val` on channel `h`.
716    fn send_case(h: &Arc<Hchan<i32>>, val: &mut i32) -> SCase {
717        SCase {
718            chan_ptr:   Arc::as_ptr(h) as *const (),
719            sg:        ptr::null_mut(),
720            elem:      val as *mut i32 as *mut u8,
721            lock_fn:   lock_i32,
722            unlock_fn: unlock_i32,
723            try_fn:    try_send_i32,
724            enqueue_fn: enqueue_send_i32,
725            dequeue_fn: dequeue_send_sg_i32,
726        }
727    }
728
729    /// Build an `SCase` for a recv on channel `h`, output into `slot`.
730    fn recv_case(h: &Arc<Hchan<i32>>, slot: &mut Option<i32>) -> SCase {
731        SCase {
732            chan_ptr:   Arc::as_ptr(h) as *const (),
733            sg:        ptr::null_mut(),
734            elem:      slot as *mut Option<i32> as *mut u8,
735            lock_fn:   lock_i32,
736            unlock_fn: unlock_i32,
737            try_fn:    try_recv_i32,
738            enqueue_fn: enqueue_recv_i32,
739            dequeue_fn: dequeue_recv_sg_i32,
740        }
741    }
742
743    // ── Fast-path tests (no goroutine park) ───────────────────────────────────
744
745    /// select { rx.recv() => ... ; default } on a buffered channel with data.
746    #[test]
747    #[go_lib::main]
748    fn fast_recv_buffered() {
749        let (tx, rx) = chan::<i32>(4);
750        tx.send(42);
751
752        let mut slot: Option<i32> = None;
753        let mut cases = [recv_case(rx.hchan(), &mut slot)];
754        let (idx, ok) = selectgo(&mut cases, true);
755
756        assert_eq!(idx, 0, "should pick recv case");
757        assert!(ok,        "should be ok (not closed)");
758        assert_eq!(slot.unwrap(), 42);
759    }
760
761    /// select { tx.send(v) => ... ; default } on a channel with buffer space.
762    #[test]
763    #[go_lib::main]
764    fn fast_send_buffered() {
765        let (tx, rx) = chan::<i32>(4);
766
767        let mut val = 99_i32;
768        let mut cases = [send_case(tx.hchan(), &mut val)];
769        let (idx, ok) = selectgo(&mut cases, true);
770
771        assert_eq!(idx, 0);
772        assert!(ok, "buffered send completes with ok=true");
773        assert_eq!(rx.recv(), Some(99));
774    }
775
776    /// select { ... ; default } when no case is ready → default taken.
777    #[test]
778    #[go_lib::main]
779    fn default_taken_when_not_ready() {
780        let (_tx, rx) = chan::<i32>(0);
781
782        let mut slot: Option<i32> = None;
783        let mut cases = [recv_case(rx.hchan(), &mut slot)];
784        let (idx, ok) = selectgo(&mut cases, true);
785
786        assert_eq!(idx, CASE_DEFAULT);
787        assert!(!ok);
788    }
789
790    /// select recv on closed+empty channel returns ok=false.
791    #[test]
792    #[go_lib::main]
793    fn recv_closed_empty() {
794        let (tx, rx) = chan::<i32>(0);
795        tx.close();
796
797        let mut slot: Option<i32> = None;
798        let mut cases = [recv_case(rx.hchan(), &mut slot)];
799        let (idx, ok) = selectgo(&mut cases, false);
800
801        assert_eq!(idx, 0);
802        assert!(!ok, "recv from closed returns ok=false");
803        assert!(slot.is_none(), "closed recv slot must stay None");
804    }
805
806    // ── Multi-case selection ──────────────────────────────────────────────────
807
808    /// Two recv cases; only one channel has data — that case wins.
809    #[test]
810    #[go_lib::main]
811    fn multi_case_first_ready_wins() {
812        let (tx1, rx1) = chan::<i32>(1);
813        let (_tx2, rx2) = chan::<i32>(1);
814
815        tx1.send(7);
816
817        let mut s1: Option<i32> = None;
818        let mut s2: Option<i32> = None;
819        let mut cases = [
820            recv_case(rx1.hchan(), &mut s1),
821            recv_case(rx2.hchan(), &mut s2),
822        ];
823        let (idx, ok) = selectgo(&mut cases, false);
824
825        assert_eq!(idx, 0);
826        assert!(ok);
827        assert_eq!(s1.unwrap(), 7);
828    }
829
830    // ── Blocking path tests (goroutine park/unpark) ───────────────────────────
831
832    /// Goroutine blocks on select recv, then a sender unblocks it.
833    #[test]
834    #[go_lib::main]
835    fn blocking_recv_unblocked_by_send() {
836        use crate::runtime::sched::spawn_goroutine;
837
838        let result = Arc::new(AtomicI32::new(-1));
839        let result2 = Arc::clone(&result);
840
841        let (tx, rx) = chan::<i32>(0);
842
843        spawn_goroutine(move || {
844            // Sender: wait a bit, then send.
845            crate::gosched();
846            tx.send(55);
847        });
848
849        let mut slot: Option<i32> = None;
850        let mut cases = [recv_case(rx.hchan(), &mut slot)];
851        // No default → will block.
852        let (idx, ok) = selectgo(&mut cases, false);
853
854        assert_eq!(idx, 0);
855        assert!(ok);
856        result2.store(slot.unwrap(), Ordering::Relaxed);
857
858        assert_eq!(result.load(Ordering::Acquire), 55);
859    }
860
861    /// Goroutine blocks on select send, then a receiver unblocks it.
862    #[test]
863    #[go_lib::main]
864    fn blocking_send_unblocked_by_recv() {
865        use crate::runtime::sched::spawn_goroutine;
866
867        let (tx, rx) = chan::<i32>(0);
868
869        spawn_goroutine(move || {
870            crate::gosched();
871            // Consume the value the select sends.
872            let _ = rx.recv();
873        });
874
875        let mut val = 77_i32;
876        let mut cases = [send_case(tx.hchan(), &mut val)];
877        let (idx, _ok) = selectgo(&mut cases, false);
878
879        assert_eq!(idx, 0);
880    }
881
882    /// Two goroutines racing on the same channel; exactly one wins via select.
883    #[test]
884    #[go_lib::main]
885    fn select_race_one_winner() {
886        use crate::runtime::sched::spawn_goroutine;
887
888        let wins = Arc::new(AtomicI32::new(0));
889        let wins2 = Arc::clone(&wins);
890        let wins3 = Arc::clone(&wins);
891        let wins4 = Arc::clone(&wins);
892
893        let (tx, rx) = chan::<i32>(1);
894        tx.send(1); // one value in the buffer
895
896        spawn_goroutine({
897            let wins = Arc::clone(&wins2);
898            let rx = rx.clone();
899            move || {
900                let mut slot: Option<i32> = None;
901                let mut cases = [recv_case(rx.hchan(), &mut slot)];
902                let (idx, ok) = selectgo(&mut cases, true);
903                if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
904            }
905        });
906
907        spawn_goroutine({
908            let wins = Arc::clone(&wins3);
909            let rx = rx.clone();
910            move || {
911                let mut slot: Option<i32> = None;
912                let mut cases = [recv_case(rx.hchan(), &mut slot)];
913                let (idx, ok) = selectgo(&mut cases, true);
914                if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
915            }
916        });
917
918        // Poll on the atomic with a wall-clock deadline so the test is
919        // robust to per-goroutine startup latency (a one-shot stack
920        // pre-grow + scheduler wakeup is ~50 µs, and the loser goroutine
921        // blocks forever in selectgo — we just need the winner to record
922        // its win).  Five seconds is comfortable headroom even on slow
923        // CI runners.
924        let deadline =
925            std::time::Instant::now() + std::time::Duration::from_secs(5);
926        while wins4.load(Ordering::Acquire) < 1
927            && std::time::Instant::now() < deadline
928        {
929            crate::gosched();
930        }
931
932        // Exactly one goroutine should have received the value.
933        assert_eq!(wins.load(Ordering::Acquire), 1);
934    }
935}