Skip to main content

go_lib/
select.rs

1// SPDX-License-Identifier: Apache-2.0
2//! `selectgo` — the runtime heart of `select { }`.
3//!
4//! Ported from `runtime/select.go`.
5//!
6//! ## How it works
7//!
8//! `selectgo` receives a slice of [`SCase`]s and an optional `has_default`
9//! flag and picks the first case that can proceed without blocking.
10//!
11//! ```text
12//! 1. Build pollorder  — a random permutation of case indices (fairness).
13//! 2. Build lockorder  — case indices sorted by channel address (deadlock prevention).
14//! 3. Acquire all channel locks in lockorder.
15//! 4. First pass (pollorder): check each case for immediate readiness.
16//!    – buffer op:    perform it, release all locks, return winner.
17//!    – direct handoff (partner waiting): dequeue partner's sudog, perform op,
18//!      release all locks, call goready(partner), return winner.
19//!    – send on closed: release all locks, panic.
20//! 5. If has_default: release all locks, return (CASE_DEFAULT, false).
21//! 6. Blocking path:
22//!    a. For every case, allocate a sudog (is_select=true) and enqueue it.
23//!    b. Reset G.selectdone to 0 and G.param to null.
24//!    c. Release all locks.
25//!    d. gopark(Select).
26//! 7. On wakeup (winner wrote G.param = winning sudog):
27//!    a. Acquire all locks in lockorder.
28//!    b. Dequeue all *losing* sudogs (dequeue_sudog is a no-op if a racing
29//!       channel op already removed them).
30//!    c. Release all locks.
31//!    d. Release all sudogs back to the free list.
32//!    e. Return (winner_index, ok).
33//! ```
34//!
35//! ## Type erasure
36//!
37//! Channels are generic (`Hchan<T>`) but `selectgo` must operate over a
38//! heterogeneous set of them.  Each [`SCase`] carries four function pointers
39//! that are monomorphised at the call site (by the `select!` macro):
40//!
41//! | pointer       | purpose                                        |
42//! |---------------|------------------------------------------------|
43//! | `lock_fn`     | acquire the channel's `RawMutex`               |
44//! | `unlock_fn`   | release the channel's `RawMutex`               |
45//! | `try_fn`      | attempt the channel op while all locks held    |
46//! | `enqueue_fn`  | enqueue a sudog on the channel's wait queue    |
47//! | `dequeue_fn`  | remove a specific sudog (O(1) cleanup)         |
48//!
49//! `chan_ptr` is the type-erased `*const Hchan<T>` used as the channel
50//! identity for deduplication and address-ordered locking.
51//!
52//! ## Sentinel index
53//!
54//! `selectgo` returns `CASE_DEFAULT` (`usize::MAX`) when the default case is
55//! taken.  Channel cases use their 0-based index within the slice.
56
57use std::mem::{ManuallyDrop, MaybeUninit};
58use std::ptr;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61
62use crate::chan::{Hchan, Receiver, Sender};
63use crate::runtime::g::{current_g, G, WaitReason};
64use crate::runtime::park::{gopark, goready};
65use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog};
66
67// ---------------------------------------------------------------------------
68// Public constants
69// ---------------------------------------------------------------------------
70
71/// Return value from [`selectgo`] when the default case is taken.
72pub const CASE_DEFAULT: usize = usize::MAX;
73
74// ---------------------------------------------------------------------------
75// TryResult — outcome of a single case's fast-path attempt
76// ---------------------------------------------------------------------------
77
78/// The result of attempting a channel case while all locks are held.
79#[derive(Debug)]
80pub(crate) enum TryResult {
81    /// Case is not immediately satisfiable.
82    NotReady,
83
84    /// Case completed via a buffer read/write.
85    /// `ok`: true for a normal value, false for a closed-channel receive.
86    Done { ok: bool },
87
88    /// Case completed via a direct goroutine-to-goroutine handoff.
89    /// The partner goroutine has been set up (`param` set, `success` set) but
90    /// not yet made runnable.  Caller must call `goready(gp)` after releasing
91    /// all locks.
92    Handoff { gp: *mut G, ok: bool },
93
94    /// Send attempted on a closed channel.  Caller must release all locks and
95    /// then `panic!("send on closed channel")`.
96    ClosedSend,
97}
98
99// SAFETY: TryResult is only ever used in a single goroutine between lock
100// acquire and lock release; the raw *mut G is not shared across threads.
101unsafe impl Send for TryResult {}
102
103// ---------------------------------------------------------------------------
104// SCase — one arm of a select statement
105// ---------------------------------------------------------------------------
106
107/// One arm of a `select` statement (send, receive, or default).
108///
109/// Constructed by [`recv_case_of`] / [`send_case_of`]; do not build directly.
110#[doc(hidden)]
111pub struct SCase {
112    /// Type-erased `*const Hchan<T>`.  Used as the channel identity for
113    /// deduplication and address-ordered locking.  `null` for a default arm.
114    pub(crate) chan_ptr: *const (),
115
116    /// The sudog enrolled on this channel while the goroutine is parked.
117    /// Set by `selectgo` in the blocking path; `null` for default and
118    /// fast-path returns.
119    pub(crate) sg: *mut Sudog,
120
121    /// Type-erased value pointer.
122    ///
123    /// **Send**: `*mut T` — the value to send (read by the fn pointers).
124    /// **Recv**: `*mut MaybeUninit<T>` — output slot written on success.
125    /// **Default**: `null`.
126    pub(crate) elem: *mut u8,
127
128    // ─── vtable — filled in by select! macro ──────────────────────────────
129
130    /// Acquire the channel's lock.
131    pub(crate) lock_fn: unsafe fn(*const ()),
132
133    /// Release the channel's lock.
134    pub(crate) unlock_fn: unsafe fn(*const ()),
135
136    /// Try the channel operation while all locks are held.
137    ///
138    /// Signature: `(chan_ptr, elem) -> TryResult`
139    ///
140    /// For a send case, `elem` is `*mut T` (the value to send).
141    /// For a recv case, `elem` is `*mut MaybeUninit<T>` (the output slot).
142    pub(crate) try_fn: unsafe fn(*const (), *mut u8) -> TryResult,
143
144    /// Enqueue `sg` on the channel's sendq or recvq (under the lock).
145    pub(crate) enqueue_fn: unsafe fn(*const (), *mut Sudog),
146
147    /// Remove `sg` from the channel's sendq or recvq (under the lock).
148    /// No-op if `sg` was already removed by a racing channel operation.
149    pub(crate) dequeue_fn: unsafe fn(*const (), *mut Sudog),
150}
151
152// SAFETY: SCase is always used within a single goroutine context; the raw
153// pointers are only shared via the scheduler under goroutine-exclusion.
154unsafe impl Send for SCase {}
155
156// ---------------------------------------------------------------------------
157// Lehmer RNG — tiny PRNG for poll-order shuffling
158// ---------------------------------------------------------------------------
159
160/// A Lehmer (Park–Miller) multiplicative congruential PRNG.
161///
162/// Used only to produce the random poll order; cryptographic quality is
163/// not required.  Seeded from the current goroutine's `goid`.
164struct Lehmer(u64);
165
166impl Lehmer {
167    fn from_goid() -> Self {
168        let goid = unsafe {
169            let gp = current_g();
170            if gp.is_null() { 1 } else { (*gp).goid | 1 }
171        };
172        Lehmer(goid | 1) // must be odd and non-zero
173    }
174
175    /// Return a pseudo-random value in `[0, n)`.
176    fn next_usize(&mut self, n: usize) -> usize {
177        // 64-bit Lehmer with multiplier from Knuth TAOCP Vol 2 §3.3.4.
178        self.0 = self.0.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
179        ((self.0 >> 33) as usize) % n
180    }
181}
182
183// ---------------------------------------------------------------------------
184// selectgo
185// ---------------------------------------------------------------------------
186
187/// Run a `select` over the given cases.
188///
189/// `cases` must contain only **channel** cases (send or receive); pass
190/// `has_default = true` if the select has a `default` arm.
191///
192/// Returns `(chosen_index, received_ok)` where:
193/// - `chosen_index` is the 0-based index into `cases`, or [`CASE_DEFAULT`] if
194///   the default arm was taken.
195/// - `received_ok` is `true` for a normal channel recv, `false` if the
196///   channel was closed (and the receive yielded the zero value).  Always
197///   `false` for send/default arms.
198///
199/// # Safety
200/// - All `SCase` fields must be correctly initialised by the caller.
201/// - Must be called from a goroutine stack (not g0 or a bare OS thread).
202#[doc(hidden)]
203pub unsafe fn selectgo(cases: &mut [SCase], has_default: bool) -> (usize, bool) {
204    let n = cases.len();
205
206    // ── 1. Build pollorder (random permutation) ───────────────────────────────
207    let mut pollorder: Vec<usize> = (0..n).collect();
208    let mut rng = Lehmer::from_goid();
209    // Fisher-Yates shuffle.
210    for i in (1..n).rev() {
211        let j = rng.next_usize(i + 1);
212        pollorder.swap(i, j);
213    }
214
215    // ── 2. Build lockorder (sorted by channel address; dedup same channel) ────
216    let mut lockorder: Vec<usize> = (0..n).collect();
217    lockorder.sort_by_key(|&i| cases[i].chan_ptr as usize);
218    // Deduplicate consecutive equal channels so we don't double-lock.
219    lockorder.dedup_by_key(|&mut i| cases[i].chan_ptr as usize);
220
221    // ── 3. Acquire all locks ──────────────────────────────────────────────────
222    for &i in &lockorder {
223        unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
224    }
225
226    // ── 4. First pass: check each case in poll order ──────────────────────────
227    for &i in &pollorder {
228        let result = unsafe { (cases[i].try_fn)(cases[i].chan_ptr, cases[i].elem) };
229        match result {
230            TryResult::NotReady => continue,
231
232            TryResult::Done { ok } => {
233                // Buffer op completed under the locks; release all and return.
234                for &j in &lockorder {
235                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
236                }
237                return (i, ok);
238            }
239
240            TryResult::Handoff { gp, ok } => {
241                // Partner dequeued and set up; release locks, wake partner.
242                for &j in &lockorder {
243                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
244                }
245                unsafe { goready(gp) };
246                return (i, ok);
247            }
248
249            TryResult::ClosedSend => {
250                for &j in &lockorder {
251                    unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
252                }
253                panic!("send on closed channel");
254            }
255        }
256    }
257
258    // ── 5. Default case ───────────────────────────────────────────────────────
259    if has_default {
260        for &i in &lockorder {
261            unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
262        }
263        return (CASE_DEFAULT, false);
264    }
265
266    // ── 6. Blocking path: enqueue sudogs on all channels ─────────────────────
267    let gp = current_g();
268    debug_assert!(!gp.is_null(), "selectgo: called from g0");
269
270    for case in cases.iter_mut() {
271        let sg = acquire_sudog();
272        unsafe {
273            (*sg).g         = gp;
274            (*sg).elem      = case.elem;
275            (*sg).is_select = true;
276            (*sg).success   = false;
277            (*sg).c         = case.chan_ptr as *mut u8;
278        }
279        case.sg = sg;
280        unsafe { (case.enqueue_fn)(case.chan_ptr, sg) };
281    }
282
283    // Reset selectdone so this goroutine can be claimed by exactly one case.
284    unsafe { (*gp).selectdone.store(0, Ordering::Release) };
285    unsafe { (*gp).param = ptr::null_mut() };
286
287    // ── 6c. Release all locks and park ────────────────────────────────────────
288    for &i in &lockorder {
289        unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
290    }
291
292    unsafe { gopark(WaitReason::Select) };
293
294    // ── 7. Woken: find winner, clean up losers ────────────────────────────────
295    //
296    // The winning channel operation stored the winning sudog in G.param.
297    let sg_winner = unsafe { (*gp).param as *mut Sudog };
298    unsafe { (*gp).param = ptr::null_mut() };
299    let ok = unsafe { (*sg_winner).success };
300
301    // Identify which case won.
302    let winner = cases
303        .iter()
304        .position(|c| c.sg == sg_winner)
305        .expect("selectgo: winning sudog not found in cases");
306
307    // 7a. Re-acquire all locks.
308    for &i in &lockorder {
309        unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
310    }
311
312    // 7b. Dequeue all losing sudogs from their channels.
313    for (i, case) in cases.iter_mut().enumerate() {
314        if i == winner { continue; }
315        let sg = case.sg;
316        unsafe { (case.dequeue_fn)(case.chan_ptr, sg) };
317    }
318
319    // 7c. Release all locks.
320    for &i in &lockorder {
321        unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
322    }
323
324    // 7d. Release all sudogs back to the pool.
325    for case in cases.iter_mut() {
326        let sg = case.sg;
327        case.sg = ptr::null_mut();
328        unsafe {
329            (*sg).g    = ptr::null_mut();
330            (*sg).elem = ptr::null_mut();
331            (*sg).c    = ptr::null_mut();
332            release_sudog(sg);
333        }
334    }
335
336    (winner, ok)
337}
338
339// ---------------------------------------------------------------------------
340// Generic vtable functions — monomorphised for each T at the call site
341// ---------------------------------------------------------------------------
342
343pub(crate) unsafe fn lock_chan<T>(p: *const ()) {
344    (*(p as *const Hchan<T>)).mutex.lock();
345}
346
347pub(crate) unsafe fn unlock_chan<T>(p: *const ()) {
348    (*(p as *const Hchan<T>)).mutex.unlock();
349}
350
351pub(crate) unsafe fn try_send_chan<T: Send + 'static>(
352    p: *const (),
353    elem: *mut u8,
354) -> TryResult {
355    let hchan = &*(p as *const Hchan<T>);
356    let state = &mut *hchan.state.get();
357
358    if state.closed {
359        return TryResult::ClosedSend;
360    }
361
362    // Waiting receiver?
363    let recv_sg = state.recvq.dequeue();
364    if !recv_sg.is_null() {
365        let gp = (*recv_sg).g;
366        let ep = (*recv_sg).elem as *mut MaybeUninit<T>;
367        if !ep.is_null() {
368            // ManuallyDrop<T> / T have identical layout → ptr::read is safe.
369            (*ep).write(ptr::read(elem as *const T));
370        }
371        (*recv_sg).success = true;
372        (*gp).param        = recv_sg as *mut u8;
373        return TryResult::Handoff { gp, ok: true };
374    }
375
376    // Buffer space?
377    if state.buf.len() < state.cap {
378        state.buf.push_back(ptr::read(elem as *const T));
379        return TryResult::Done { ok: true };
380    }
381
382    TryResult::NotReady
383}
384
385pub(crate) unsafe fn try_recv_chan<T: Send + 'static>(
386    p: *const (),
387    elem: *mut u8,
388) -> TryResult {
389    let hchan = &*(p as *const Hchan<T>);
390    let state = &mut *hchan.state.get();
391
392    // Waiting sender?
393    let send_sg = state.sendq.dequeue();
394    if !send_sg.is_null() {
395        let gp    = (*send_sg).g;
396        let ep    = (*send_sg).elem as *mut MaybeUninit<T>; // may be ManuallyDrop<T>
397        let boxed = (*send_sg).boxed_elem;
398        let val = if state.cap == 0 {
399            let v = (*ep).assume_init_read();
400            if boxed { let _ = Box::from_raw(ep); }
401            (*send_sg).elem = ptr::null_mut();
402            v
403        } else {
404            let head = state.buf.pop_front().unwrap();
405            let sv   = (*ep).assume_init_read();
406            if boxed { let _ = Box::from_raw(ep); }
407            (*send_sg).elem = ptr::null_mut();
408            state.buf.push_back(sv);
409            head
410        };
411        (*(elem as *mut MaybeUninit<T>)).write(val);
412        (*send_sg).success = true;
413        (*gp).param        = send_sg as *mut u8;
414        return TryResult::Handoff { gp, ok: true };
415    }
416
417    // Buffer data?
418    if !state.buf.is_empty() {
419        let val = state.buf.pop_front().unwrap();
420        (*(elem as *mut MaybeUninit<T>)).write(val);
421        return TryResult::Done { ok: true };
422    }
423
424    // Closed and empty → leave elem uninitialised; caller checks ok=false.
425    if state.closed {
426        return TryResult::Done { ok: false };
427    }
428
429    TryResult::NotReady
430}
431
432pub(crate) unsafe fn enqueue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
433    let hchan = &*(p as *const Hchan<T>);
434    (*hchan.state.get()).sendq.enqueue(sg);
435}
436
437pub(crate) unsafe fn enqueue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
438    let hchan = &*(p as *const Hchan<T>);
439    (*hchan.state.get()).recvq.enqueue(sg);
440}
441
442pub(crate) unsafe fn dequeue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
443    let hchan = &*(p as *const Hchan<T>);
444    (*hchan.state.get()).sendq.dequeue_sudog(sg);
445}
446
447pub(crate) unsafe fn dequeue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
448    let hchan = &*(p as *const Hchan<T>);
449    (*hchan.state.get()).recvq.dequeue_sudog(sg);
450}
451
452// ---------------------------------------------------------------------------
453// Public factory functions — used by the select! macro
454// ---------------------------------------------------------------------------
455
456/// Build a receive [`SCase`] for use in [`selectgo`].
457///
458/// `slot` must point to a valid (possibly uninitialised) `MaybeUninit<T>` that
459/// outlives the `selectgo` call.  On success (`ok = true`) the slot holds the
460/// received value; on `ok = false` (channel closed) the slot is uninitialised.
461///
462/// Called by the `select!` macro; not intended for direct use.
463#[doc(hidden)]
464pub fn recv_case_of<T: Send + 'static>(rx: &Receiver<T>, slot: *mut MaybeUninit<T>) -> SCase {
465    SCase {
466        chan_ptr:    Arc::as_ptr(rx.hchan()) as *const (),
467        sg:          ptr::null_mut(),
468        elem:        slot as *mut u8,
469        lock_fn:     lock_chan::<T>,
470        unlock_fn:   unlock_chan::<T>,
471        try_fn:      try_recv_chan::<T>,
472        enqueue_fn:  enqueue_recv_chan::<T>,
473        dequeue_fn:  dequeue_recv_chan::<T>,
474    }
475}
476
477/// Build a send [`SCase`] for use in [`selectgo`].
478///
479/// `val` must point to a `ManuallyDrop<T>` that outlives the `selectgo` call.
480/// If the case wins, the value is moved into the channel and the caller must
481/// **not** drop `*val`.  If the case loses, the caller must call
482/// `ManuallyDrop::drop(val)` to avoid a leak.
483///
484/// Called by the `select!` macro; not intended for direct use.
485#[doc(hidden)]
486pub fn send_case_of<T: Send + 'static>(tx: &Sender<T>, val: *mut ManuallyDrop<T>) -> SCase {
487    SCase {
488        chan_ptr:    Arc::as_ptr(tx.hchan()) as *const (),
489        sg:          ptr::null_mut(),
490        elem:        val as *mut u8,
491        lock_fn:     lock_chan::<T>,
492        unlock_fn:   unlock_chan::<T>,
493        try_fn:      try_send_chan::<T>,
494        enqueue_fn:  enqueue_send_chan::<T>,
495        dequeue_fn:  dequeue_send_chan::<T>,
496    }
497}
498
499// ---------------------------------------------------------------------------
500// Tests
501// ---------------------------------------------------------------------------
502
503#[cfg(all(test, not(loom)))]
504#[allow(unused_unsafe)] // some closures call unsafe fn inside an outer unsafe{} block
505mod tests {
506    use super::*;
507    use crate::chan::{chan, Hchan};
508    use crate::runtime::sudog::Sudog;
509    use crate::runtime::sched::run_impl;
510    use std::mem::MaybeUninit;
511    use std::ptr;
512    use std::sync::atomic::{AtomicI32, Ordering};
513    use std::sync::Arc;
514
515    // ─── vtable helpers for Hchan<i32> ────────────────────────────────────────
516
517    unsafe fn lock_i32(p: *const ()) {
518        (*(p as *const Hchan<i32>)).mutex.lock();
519    }
520    unsafe fn unlock_i32(p: *const ()) {
521        unsafe { (*(p as *const Hchan<i32>)).mutex.unlock() };
522    }
523
524    /// try_fn for a **send** case on `Hchan<i32>`.
525    ///
526    /// `elem` points to a `i32` to send.  Checks recvq and buffer.
527    unsafe fn try_send_i32(p: *const (), elem: *mut u8) -> TryResult {
528        let hchan = &*(p as *const Hchan<i32>);
529        // SAFETY: caller holds the channel lock.
530        let state = &mut *hchan.state.get();
531
532        if state.closed {
533            return TryResult::ClosedSend;
534        }
535
536        // Waiting receiver?
537        let recv_sg = state.recvq.dequeue();
538        if !recv_sg.is_null() {
539            let gp  = (*recv_sg).g;
540            let ep  = (*recv_sg).elem as *mut MaybeUninit<i32>;
541            if !ep.is_null() {
542                (*ep).write(ptr::read(elem as *const i32));
543            }
544            (*recv_sg).success = true;
545            (*gp).param        = recv_sg as *mut u8;
546            return TryResult::Handoff { gp, ok: true };
547        }
548
549        // Buffer space?
550        if state.buf.len() < state.cap {
551            state.buf.push_back(ptr::read(elem as *const i32));
552            return TryResult::Done { ok: true };
553        }
554
555        TryResult::NotReady
556    }
557
558    /// try_fn for a **recv** case on `Hchan<i32>`.
559    ///
560    /// `elem` points to a `MaybeUninit<i32>` output slot.
561    unsafe fn try_recv_i32(p: *const (), elem: *mut u8) -> TryResult {
562        let hchan = &*(p as *const Hchan<i32>);
563        let state = &mut *hchan.state.get();
564
565        // Waiting sender?
566        let send_sg = state.sendq.dequeue();
567        if !send_sg.is_null() {
568            let gp    = (*send_sg).g;
569            let ep    = (*send_sg).elem as *mut MaybeUninit<i32>;
570            let boxed = (*send_sg).boxed_elem;
571            let val = if state.cap == 0 {
572                let v = (*ep).assume_init_read();
573                if boxed { let _ = Box::from_raw(ep); }
574                (*send_sg).elem = ptr::null_mut();
575                v
576            } else {
577                let head = state.buf.pop_front().unwrap();
578                let sv   = (*ep).assume_init_read();
579                if boxed { let _ = Box::from_raw(ep); }
580                (*send_sg).elem = ptr::null_mut();
581                state.buf.push_back(sv);
582                head
583            };
584            (*(elem as *mut MaybeUninit<i32>)).write(val);
585            (*send_sg).success = true;
586            (*gp).param        = send_sg as *mut u8;
587            return TryResult::Handoff { gp, ok: true };
588        }
589
590        // Buffer has data?
591        if !state.buf.is_empty() {
592            let val = state.buf.pop_front().unwrap();
593            (*(elem as *mut MaybeUninit<i32>)).write(val);
594            return TryResult::Done { ok: true };
595        }
596
597        // Closed and empty?
598        if state.closed {
599            (*(elem as *mut MaybeUninit<i32>)) = MaybeUninit::uninit();
600            return TryResult::Done { ok: false };
601        }
602
603        TryResult::NotReady
604    }
605
606    unsafe fn enqueue_send_i32(p: *const (), sg: *mut Sudog) {
607        let hchan = &*(p as *const Hchan<i32>);
608        (*hchan.state.get()).sendq.enqueue(sg);
609    }
610    unsafe fn enqueue_recv_i32(p: *const (), sg: *mut Sudog) {
611        let hchan = &*(p as *const Hchan<i32>);
612        (*hchan.state.get()).recvq.enqueue(sg);
613    }
614    unsafe fn dequeue_send_sg_i32(p: *const (), sg: *mut Sudog) {
615        let hchan = &*(p as *const Hchan<i32>);
616        (*hchan.state.get()).sendq.dequeue_sudog(sg);
617    }
618    unsafe fn dequeue_recv_sg_i32(p: *const (), sg: *mut Sudog) {
619        let hchan = &*(p as *const Hchan<i32>);
620        (*hchan.state.get()).recvq.dequeue_sudog(sg);
621    }
622
623    /// Build an `SCase` for a buffered-send of `val` on channel `h`.
624    fn send_case(h: &Arc<Hchan<i32>>, val: &mut i32) -> SCase {
625        SCase {
626            chan_ptr:   Arc::as_ptr(h) as *const (),
627            sg:        ptr::null_mut(),
628            elem:      val as *mut i32 as *mut u8,
629            lock_fn:   lock_i32,
630            unlock_fn: unlock_i32,
631            try_fn:    try_send_i32,
632            enqueue_fn: enqueue_send_i32,
633            dequeue_fn: dequeue_send_sg_i32,
634        }
635    }
636
637    /// Build an `SCase` for a recv on channel `h`, output into `slot`.
638    fn recv_case(h: &Arc<Hchan<i32>>, slot: &mut MaybeUninit<i32>) -> SCase {
639        SCase {
640            chan_ptr:   Arc::as_ptr(h) as *const (),
641            sg:        ptr::null_mut(),
642            elem:      slot as *mut MaybeUninit<i32> as *mut u8,
643            lock_fn:   lock_i32,
644            unlock_fn: unlock_i32,
645            try_fn:    try_recv_i32,
646            enqueue_fn: enqueue_recv_i32,
647            dequeue_fn: dequeue_recv_sg_i32,
648        }
649    }
650
651    // ── Fast-path tests (no goroutine park) ───────────────────────────────────
652
653    /// select { rx.recv() => ... ; default } on a buffered channel with data.
654    #[test]
655    fn fast_recv_buffered() {
656        run_impl(|| {
657            let (tx, rx) = chan::<i32>(4);
658            tx.send(42);
659
660            let mut slot = MaybeUninit::<i32>::uninit();
661            let mut cases = [recv_case(rx.hchan(), &mut slot)];
662            let (idx, ok) = unsafe { selectgo(&mut cases, true) };
663
664            assert_eq!(idx, 0, "should pick recv case");
665            assert!(ok,        "should be ok (not closed)");
666            assert_eq!(unsafe { slot.assume_init() }, 42);
667        });
668    }
669
670    /// select { tx.send(v) => ... ; default } on a channel with buffer space.
671    #[test]
672    fn fast_send_buffered() {
673        run_impl(|| {
674            let (tx, rx) = chan::<i32>(4);
675
676            let mut val = 99_i32;
677            let mut cases = [send_case(tx.hchan(), &mut val)];
678            let (idx, ok) = unsafe { selectgo(&mut cases, true) };
679
680            assert_eq!(idx, 0);
681            assert!(ok, "buffered send completes with ok=true");
682            assert_eq!(rx.recv(), Some(99));
683        });
684    }
685
686    /// select { ... ; default } when no case is ready → default taken.
687    #[test]
688    fn default_taken_when_not_ready() {
689        run_impl(|| {
690            let (_tx, rx) = chan::<i32>(0);
691
692            let mut slot = MaybeUninit::<i32>::uninit();
693            let mut cases = [recv_case(rx.hchan(), &mut slot)];
694            let (idx, ok) = unsafe { selectgo(&mut cases, true) };
695
696            assert_eq!(idx, CASE_DEFAULT);
697            assert!(!ok);
698        });
699    }
700
701    /// select recv on closed+empty channel returns ok=false.
702    #[test]
703    fn recv_closed_empty() {
704        run_impl(|| {
705            let (tx, rx) = chan::<i32>(0);
706            tx.close();
707
708            let mut slot = MaybeUninit::<i32>::uninit();
709            let mut cases = [recv_case(rx.hchan(), &mut slot)];
710            let (idx, ok) = unsafe { selectgo(&mut cases, false) };
711
712            assert_eq!(idx, 0);
713            assert!(!ok, "recv from closed returns ok=false");
714        });
715    }
716
717    // ── Multi-case selection ──────────────────────────────────────────────────
718
719    /// Two recv cases; only one channel has data — that case wins.
720    #[test]
721    fn multi_case_first_ready_wins() {
722        run_impl(|| {
723            let (tx1, rx1) = chan::<i32>(1);
724            let (_tx2, rx2) = chan::<i32>(1);
725
726            tx1.send(7);
727
728            let mut s1 = MaybeUninit::<i32>::uninit();
729            let mut s2 = MaybeUninit::<i32>::uninit();
730            let mut cases = [
731                recv_case(rx1.hchan(), &mut s1),
732                recv_case(rx2.hchan(), &mut s2),
733            ];
734            let (idx, ok) = unsafe { selectgo(&mut cases, false) };
735
736            assert_eq!(idx, 0);
737            assert!(ok);
738            assert_eq!(unsafe { s1.assume_init() }, 7);
739        });
740    }
741
742    // ── Blocking path tests (goroutine park/unpark) ───────────────────────────
743
744    /// Goroutine blocks on select recv, then a sender unblocks it.
745    #[test]
746    fn blocking_recv_unblocked_by_send() {
747        use crate::runtime::sched::spawn_goroutine;
748
749        let result = Arc::new(AtomicI32::new(-1));
750        let result2 = Arc::clone(&result);
751
752        run_impl(move || {
753            let (tx, rx) = chan::<i32>(0);
754
755            unsafe {
756                spawn_goroutine(move || {
757                    // Sender: wait a bit, then send.
758                    crate::gosched();
759                    tx.send(55);
760                });
761            }
762
763            let mut slot = MaybeUninit::<i32>::uninit();
764            let mut cases = [recv_case(rx.hchan(), &mut slot)];
765            // No default → will block.
766            let (idx, ok) = unsafe { selectgo(&mut cases, false) };
767
768            assert_eq!(idx, 0);
769            assert!(ok);
770            result2.store(unsafe { slot.assume_init() }, Ordering::Relaxed);
771        });
772
773        assert_eq!(result.load(Ordering::Acquire), 55);
774    }
775
776    /// Goroutine blocks on select send, then a receiver unblocks it.
777    #[test]
778    fn blocking_send_unblocked_by_recv() {
779        use crate::runtime::sched::spawn_goroutine;
780
781        run_impl(|| {
782            let (tx, rx) = chan::<i32>(0);
783
784            unsafe {
785                spawn_goroutine(move || {
786                    crate::gosched();
787                    // Consume the value the select sends.
788                    let _ = rx.recv();
789                });
790            }
791
792            let mut val = 77_i32;
793            let mut cases = [send_case(tx.hchan(), &mut val)];
794            let (idx, _ok) = unsafe { selectgo(&mut cases, false) };
795
796            assert_eq!(idx, 0);
797        });
798    }
799
800    /// Two goroutines racing on the same channel; exactly one wins via select.
801    #[test]
802    fn select_race_one_winner() {
803        use crate::runtime::sched::spawn_goroutine;
804
805        let wins = Arc::new(AtomicI32::new(0));
806        let wins2 = Arc::clone(&wins);
807        let wins3 = Arc::clone(&wins);
808
809        run_impl(move || {
810            let (tx, rx) = chan::<i32>(1);
811            tx.send(1); // one value in the buffer
812
813            unsafe {
814                spawn_goroutine({
815                    let wins = Arc::clone(&wins2);
816                    let rx = rx.clone();
817                    move || {
818                        let mut slot = MaybeUninit::<i32>::uninit();
819                        let mut cases = [recv_case(rx.hchan(), &mut slot)];
820                        let (idx, ok) = unsafe { selectgo(&mut cases, true) };
821                        if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
822                    }
823                });
824            }
825
826            unsafe {
827                spawn_goroutine({
828                    let wins = Arc::clone(&wins3);
829                    let rx = rx.clone();
830                    move || {
831                        let mut slot = MaybeUninit::<i32>::uninit();
832                        let mut cases = [recv_case(rx.hchan(), &mut slot)];
833                        let (idx, ok) = unsafe { selectgo(&mut cases, true) };
834                        if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
835                    }
836                });
837            }
838
839            // Give goroutines time to race.
840            for _ in 0..200 { crate::gosched(); }
841        });
842
843        // Exactly one goroutine should have received the value.
844        assert_eq!(wins.load(Ordering::Acquire), 1);
845    }
846}