go_lib/select.rs
1// SPDX-License-Identifier: Apache-2.0
2//! `selectgo` — the runtime heart of `select { }`.
3//!
4//! Ported from `runtime/select.go`.
5//!
6//! ## How it works
7//!
8//! `selectgo` receives a slice of [`SCase`]s and an optional `has_default`
9//! flag and picks the first case that can proceed without blocking.
10//!
11//! ```text
12//! 1. Build pollorder — a random permutation of case indices (fairness).
13//! 2. Build lockorder — case indices sorted by channel address (deadlock prevention).
14//! 3. Acquire all channel locks in lockorder.
15//! 4. First pass (pollorder): check each case for immediate readiness.
16//! – buffer op: perform it, release all locks, return winner.
17//! – direct handoff (partner waiting): dequeue partner's sudog, perform op,
18//! release all locks, call goready(partner), return winner.
19//! – send on closed: release all locks, panic.
20//! 5. If has_default: release all locks, return (CASE_DEFAULT, false).
21//! 6. Blocking path:
22//! a. For every case, allocate a sudog (is_select=true) and enqueue it.
23//! b. Reset G.selectdone to 0 and G.param to null.
24//! c. Release all locks.
25//! d. gopark(Select).
26//! 7. On wakeup (winner wrote G.param = winning sudog):
27//! a. Acquire all locks in lockorder.
28//! b. Dequeue all *losing* sudogs (dequeue_sudog is a no-op if a racing
29//! channel op already removed them).
30//! c. Release all locks.
31//! d. Release all sudogs back to the free list.
32//! e. Return (winner_index, ok).
33//! ```
34//!
35//! ## Type erasure
36//!
37//! Channels are generic (`Hchan<T>`) but `selectgo` must operate over a
38//! heterogeneous set of them. Each [`SCase`] carries four function pointers
39//! that are monomorphised at the call site (by the `select!` macro):
40//!
41//! | pointer | purpose |
42//! |---------------|------------------------------------------------|
43//! | `lock_fn` | acquire the channel's `RawMutex` |
44//! | `unlock_fn` | release the channel's `RawMutex` |
45//! | `try_fn` | attempt the channel op while all locks held |
46//! | `enqueue_fn` | enqueue a sudog on the channel's wait queue |
47//! | `dequeue_fn` | remove a specific sudog (O(1) cleanup) |
48//!
49//! `chan_ptr` is the type-erased `*const Hchan<T>` used as the channel
50//! identity for deduplication and address-ordered locking.
51//!
52//! ## Sentinel index
53//!
54//! `selectgo` returns `CASE_DEFAULT` (`usize::MAX`) when the default case is
55//! taken. Channel cases use their 0-based index within the slice.
56
57use std::mem::ManuallyDrop;
58use std::ptr;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61
62use crate::chan::{Hchan, Receiver, Sender};
63use crate::runtime::g::{current_g, G, WaitReason};
64use crate::runtime::park::{gopark, goready};
65use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog};
66
67// ---------------------------------------------------------------------------
68// Public constants
69// ---------------------------------------------------------------------------
70
71/// Return value from [`selectgo`] when the default case is taken.
72pub const CASE_DEFAULT: usize = usize::MAX;
73
74// ---------------------------------------------------------------------------
75// TryResult — outcome of a single case's fast-path attempt
76// ---------------------------------------------------------------------------
77
78/// The result of attempting a channel case while all locks are held.
79#[derive(Debug)]
80pub(crate) enum TryResult {
81 /// Case is not immediately satisfiable.
82 NotReady,
83
84 /// Case completed via a buffer read/write.
85 /// `ok`: true for a normal value, false for a closed-channel receive.
86 Done { ok: bool },
87
88 /// Case completed via a direct goroutine-to-goroutine handoff.
89 /// The partner goroutine has been set up (`param` set, `success` set) but
90 /// not yet made runnable. Caller must call `goready(gp)` after releasing
91 /// all locks.
92 Handoff { gp: *mut G, ok: bool },
93
94 /// Send attempted on a closed channel. Caller must release all locks and
95 /// then `panic!("send on closed channel")`.
96 ClosedSend,
97}
98
99// SAFETY: TryResult is only ever used in a single goroutine between lock
100// acquire and lock release; the raw *mut G is not shared across threads.
101unsafe impl Send for TryResult {}
102
103// ---------------------------------------------------------------------------
104// SCase — one arm of a select statement
105// ---------------------------------------------------------------------------
106
107/// One arm of a `select` statement (send, receive, or default).
108///
109/// Constructed by [`recv_case_of`] / [`send_case_of`]; do not build directly.
110#[doc(hidden)]
111pub struct SCase {
112 /// Type-erased `*const Hchan<T>`. Used as the channel identity for
113 /// deduplication and address-ordered locking. `null` for a default arm.
114 pub(crate) chan_ptr: *const (),
115
116 /// The sudog enrolled on this channel while the goroutine is parked.
117 /// Set by `selectgo` in the blocking path; `null` for default and
118 /// fast-path returns.
119 pub(crate) sg: *mut Sudog,
120
121 /// Type-erased value pointer.
122 ///
123 /// **Send**: `*mut ManuallyDrop<T>` — the value to send (read by the fn pointers).
124 /// **Recv**: `*mut Option<T>` — output slot; written as `Some(val)` on a
125 /// successful receive, left as `None` when the channel is closed.
126 /// **Default**: `null`.
127 pub(crate) elem: *mut u8,
128
129 // ─── vtable — filled in by select! macro ──────────────────────────────
130
131 /// Acquire the channel's lock.
132 pub(crate) lock_fn: unsafe fn(*const ()),
133
134 /// Release the channel's lock.
135 pub(crate) unlock_fn: unsafe fn(*const ()),
136
137 /// Try the channel operation while all locks are held.
138 ///
139 /// Signature: `(chan_ptr, elem) -> TryResult`
140 ///
141 /// For a send case, `elem` is `*mut ManuallyDrop<T>` (the value to send).
142 /// For a recv case, `elem` is `*mut Option<T>` (the output slot).
143 pub(crate) try_fn: unsafe fn(*const (), *mut u8) -> TryResult,
144
145 /// Enqueue `sg` on the channel's sendq or recvq (under the lock).
146 pub(crate) enqueue_fn: unsafe fn(*const (), *mut Sudog),
147
148 /// Remove `sg` from the channel's sendq or recvq (under the lock).
149 /// No-op if `sg` was already removed by a racing channel operation.
150 pub(crate) dequeue_fn: unsafe fn(*const (), *mut Sudog),
151}
152
153// SAFETY: SCase is always used within a single goroutine context; the raw
154// pointers are only shared via the scheduler under goroutine-exclusion.
155unsafe impl Send for SCase {}
156
157// ---------------------------------------------------------------------------
158// Lehmer RNG — tiny PRNG for poll-order shuffling
159// ---------------------------------------------------------------------------
160
161/// A Lehmer (Park–Miller) multiplicative congruential PRNG.
162///
163/// Used only to produce the random poll order; cryptographic quality is
164/// not required. Seeded from the current goroutine's `goid`.
165struct Lehmer(u64);
166
167impl Lehmer {
168 fn from_goid() -> Self {
169 let gp = current_g();
170 // SAFETY: gp is only dereferenced after the null check.
171 let goid = if gp.is_null() { 1 } else { (unsafe { (*gp).goid }) | 1 };
172 Lehmer(goid | 1) // must be odd and non-zero
173 }
174
175 /// Return a pseudo-random value in `[0, n)`.
176 fn next_usize(&mut self, n: usize) -> usize {
177 // 64-bit Lehmer with multiplier from Knuth TAOCP Vol 2 §3.3.4.
178 self.0 = self.0.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
179 ((self.0 >> 33) as usize) % n
180 }
181}
182
183// ---------------------------------------------------------------------------
184// selectgo
185// ---------------------------------------------------------------------------
186
187/// Run a `select` over the given cases.
188///
189/// `cases` must contain only **channel** cases (send or receive); pass
190/// `has_default = true` if the select has a `default` arm.
191///
192/// Returns `(chosen_index, received_ok)` where:
193/// - `chosen_index` is the 0-based index into `cases`, or [`CASE_DEFAULT`] if
194/// the default arm was taken.
195/// - `received_ok` is `true` for a normal channel recv, `false` if the
196/// channel was closed (and the receive wrote `None` into the slot). Always
197/// `false` for send/default arms.
198///
199/// # Preconditions
200///
201/// - All `SCase` values must be created by [`recv_case_of`] or [`send_case_of`].
202/// - Must be called from a goroutine stack (not g0 or a bare OS thread).
203/// A `debug_assert` fires in debug builds if this is violated.
204///
205/// This function is intended only for use by the `select!` macro.
206#[doc(hidden)]
207pub fn selectgo(cases: &mut [SCase], has_default: bool) -> (usize, bool) {
208 let n = cases.len();
209
210 // ── 1. Build pollorder (random permutation) ───────────────────────────────
211 let mut pollorder: Vec<usize> = (0..n).collect();
212 let mut rng = Lehmer::from_goid();
213 // Fisher-Yates shuffle.
214 for i in (1..n).rev() {
215 let j = rng.next_usize(i + 1);
216 pollorder.swap(i, j);
217 }
218
219 // ── 2. Build lockorder (sorted by channel address; dedup same channel) ────
220 let mut lockorder: Vec<usize> = (0..n).collect();
221 lockorder.sort_by_key(|&i| cases[i].chan_ptr as usize);
222 // Deduplicate consecutive equal channels so we don't double-lock.
223 lockorder.dedup_by_key(|&mut i| cases[i].chan_ptr as usize);
224
225 // ── 3. Acquire all locks ──────────────────────────────────────────────────
226 for &i in &lockorder {
227 unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
228 }
229
230 // ── 4. First pass: check each case in poll order ──────────────────────────
231 for &i in &pollorder {
232 let result = unsafe { (cases[i].try_fn)(cases[i].chan_ptr, cases[i].elem) };
233 match result {
234 TryResult::NotReady => continue,
235
236 TryResult::Done { ok } => {
237 // Buffer op completed under the locks; release all and return.
238 for &j in &lockorder {
239 unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
240 }
241 return (i, ok);
242 }
243
244 TryResult::Handoff { gp, ok } => {
245 // Partner dequeued and set up; release locks, wake partner.
246 for &j in &lockorder {
247 unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
248 }
249 unsafe { goready(gp) };
250 return (i, ok);
251 }
252
253 TryResult::ClosedSend => {
254 for &j in &lockorder {
255 unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
256 }
257 panic!("send on closed channel");
258 }
259 }
260 }
261
262 // ── 5. Default case ───────────────────────────────────────────────────────
263 if has_default {
264 for &i in &lockorder {
265 unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
266 }
267 return (CASE_DEFAULT, false);
268 }
269
270 // ── 6. Blocking path: enqueue sudogs on all channels ─────────────────────
271 let gp = current_g();
272 debug_assert!(!gp.is_null(), "selectgo: called from g0");
273
274 for case in cases.iter_mut() {
275 let sg = acquire_sudog();
276 unsafe {
277 (*sg).g = gp;
278 (*sg).elem = case.elem;
279 (*sg).is_select = true;
280 (*sg).success = false;
281 (*sg).c = case.chan_ptr as *mut u8;
282 }
283 case.sg = sg;
284 unsafe { (case.enqueue_fn)(case.chan_ptr, sg) };
285 }
286
287 // Reset selectdone so this goroutine can be claimed by exactly one case.
288 unsafe { (*gp).selectdone.store(0, Ordering::Release) };
289 unsafe { (*gp).param = ptr::null_mut() };
290
291 // ── 6c. Release all locks and park ────────────────────────────────────────
292 for &i in &lockorder {
293 unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
294 }
295
296 gopark(WaitReason::Select);
297
298 // ── 7. Woken: find winner, clean up losers ────────────────────────────────
299 //
300 // The winning channel operation stored the winning sudog in G.param.
301 let sg_winner = unsafe { (*gp).param as *mut Sudog };
302 unsafe { (*gp).param = ptr::null_mut() };
303 let ok = unsafe { (*sg_winner).success };
304
305 // Identify which case won.
306 let winner = cases
307 .iter()
308 .position(|c| c.sg == sg_winner)
309 .expect("selectgo: winning sudog not found in cases");
310
311 // 7a. Re-acquire all locks.
312 for &i in &lockorder {
313 unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
314 }
315
316 // 7b. Dequeue all losing sudogs from their channels.
317 for (i, case) in cases.iter_mut().enumerate() {
318 if i == winner { continue; }
319 let sg = case.sg;
320 unsafe { (case.dequeue_fn)(case.chan_ptr, sg) };
321 }
322
323 // 7c. Release all locks.
324 for &i in &lockorder {
325 unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
326 }
327
328 // 7d. Release all sudogs back to the pool.
329 for case in cases.iter_mut() {
330 let sg = case.sg;
331 case.sg = ptr::null_mut();
332 unsafe {
333 (*sg).g = ptr::null_mut();
334 (*sg).elem = ptr::null_mut();
335 (*sg).c = ptr::null_mut();
336 release_sudog(sg);
337 }
338 }
339
340 (winner, ok)
341}
342
343// ---------------------------------------------------------------------------
344// Generic vtable functions — monomorphised for each T at the call site
345// ---------------------------------------------------------------------------
346
347pub(crate) unsafe fn lock_chan<T>(p: *const ()) {
348 (*(p as *const Hchan<T>)).mutex.lock();
349}
350
351pub(crate) unsafe fn unlock_chan<T>(p: *const ()) {
352 (*(p as *const Hchan<T>)).mutex.unlock();
353}
354
355pub(crate) unsafe fn try_send_chan<T: Send + 'static>(
356 p: *const (),
357 elem: *mut u8,
358) -> TryResult {
359 let hchan = &*(p as *const Hchan<T>);
360 let state = &mut *hchan.state.get();
361
362 if state.closed {
363 return TryResult::ClosedSend;
364 }
365
366 // Waiting receiver?
367 let recv_sg = state.recvq.dequeue();
368 if !recv_sg.is_null() {
369 let gp = (*recv_sg).g;
370 // elem is *mut ManuallyDrop<T> (send slot); recv_sg.elem is *mut Option<T>.
371 let ep = (*recv_sg).elem as *mut Option<T>;
372 if !ep.is_null() {
373 *ep = Some(ptr::read(elem as *const T));
374 }
375 (*recv_sg).success = true;
376 (*gp).param = recv_sg as *mut u8;
377 return TryResult::Handoff { gp, ok: true };
378 }
379
380 // Buffer space?
381 if state.buf.len() < state.cap {
382 state.buf.push_back(ptr::read(elem as *const T));
383 return TryResult::Done { ok: true };
384 }
385
386 TryResult::NotReady
387}
388
389pub(crate) unsafe fn try_recv_chan<T: Send + 'static>(
390 p: *const (),
391 elem: *mut u8, // *mut Option<T>
392) -> TryResult {
393 let hchan = &*(p as *const Hchan<T>);
394 let state = &mut *hchan.state.get();
395
396 // Waiting sender?
397 let send_sg = state.sendq.dequeue();
398 if !send_sg.is_null() {
399 let gp = (*send_sg).g;
400 // send_sg.elem is *mut ManuallyDrop<T>; use ManuallyDrop explicitly so
401 // Box::from_raw for the boxed path does not run T's destructor.
402 let ep = (*send_sg).elem as *mut ManuallyDrop<T>;
403 let boxed = (*send_sg).boxed_elem;
404 let val = if state.cap == 0 {
405 let v = ManuallyDrop::into_inner(ptr::read(ep));
406 if boxed { let _ = Box::from_raw(ep); }
407 (*send_sg).elem = ptr::null_mut();
408 v
409 } else {
410 let head = state.buf.pop_front().unwrap();
411 let sv = ManuallyDrop::into_inner(ptr::read(ep));
412 if boxed { let _ = Box::from_raw(ep); }
413 (*send_sg).elem = ptr::null_mut();
414 state.buf.push_back(sv);
415 head
416 };
417 *(elem as *mut Option<T>) = Some(val);
418 (*send_sg).success = true;
419 (*gp).param = send_sg as *mut u8;
420 return TryResult::Handoff { gp, ok: true };
421 }
422
423 // Buffer data?
424 if !state.buf.is_empty() {
425 let val = state.buf.pop_front().unwrap();
426 *(elem as *mut Option<T>) = Some(val);
427 return TryResult::Done { ok: true };
428 }
429
430 // Closed and empty → elem stays None; caller checks ok=false.
431 if state.closed {
432 return TryResult::Done { ok: false };
433 }
434
435 TryResult::NotReady
436}
437
438pub(crate) unsafe fn enqueue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
439 let hchan = &*(p as *const Hchan<T>);
440 (*hchan.state.get()).sendq.enqueue(sg);
441}
442
443pub(crate) unsafe fn enqueue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
444 let hchan = &*(p as *const Hchan<T>);
445 (*hchan.state.get()).recvq.enqueue(sg);
446}
447
448pub(crate) unsafe fn dequeue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
449 let hchan = &*(p as *const Hchan<T>);
450 (*hchan.state.get()).sendq.dequeue_sudog(sg);
451}
452
453pub(crate) unsafe fn dequeue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
454 let hchan = &*(p as *const Hchan<T>);
455 (*hchan.state.get()).recvq.dequeue_sudog(sg);
456}
457
458// ---------------------------------------------------------------------------
459// Public factory functions — used by the select! macro
460// ---------------------------------------------------------------------------
461
462/// Build a receive [`SCase`] for use in [`selectgo`].
463///
464/// `slot` must point to an `Option<T>` initialised to `None` that outlives the
465/// `selectgo` call. On a successful receive (`ok = true`) the slot is
466/// `Some(value)`; on `ok = false` (channel closed) the slot remains `None`.
467///
468/// Called by the `select!` macro; not intended for direct use.
469#[doc(hidden)]
470pub fn recv_case_of<T: Send + 'static>(rx: &Receiver<T>, slot: *mut Option<T>) -> SCase {
471 SCase {
472 chan_ptr: Arc::as_ptr(rx.hchan()) as *const (),
473 sg: ptr::null_mut(),
474 elem: slot as *mut u8,
475 lock_fn: lock_chan::<T>,
476 unlock_fn: unlock_chan::<T>,
477 try_fn: try_recv_chan::<T>,
478 enqueue_fn: enqueue_recv_chan::<T>,
479 dequeue_fn: dequeue_recv_chan::<T>,
480 }
481}
482
483/// Build a send [`SCase`] for use in [`selectgo`].
484///
485/// `val` must point to a `ManuallyDrop<T>` that outlives the `selectgo` call.
486/// If the case wins, the value is moved into the channel and the caller must
487/// **not** drop `*val`. If the case loses, the caller must call
488/// `ManuallyDrop::drop(val)` to avoid a leak.
489///
490/// Called by the `select!` macro; not intended for direct use.
491#[doc(hidden)]
492pub fn send_case_of<T: Send + 'static>(tx: &Sender<T>, val: *mut ManuallyDrop<T>) -> SCase {
493 SCase {
494 chan_ptr: Arc::as_ptr(tx.hchan()) as *const (),
495 sg: ptr::null_mut(),
496 elem: val as *mut u8,
497 lock_fn: lock_chan::<T>,
498 unlock_fn: unlock_chan::<T>,
499 try_fn: try_send_chan::<T>,
500 enqueue_fn: enqueue_send_chan::<T>,
501 dequeue_fn: dequeue_send_chan::<T>,
502 }
503}
504
505// ---------------------------------------------------------------------------
506// Tests
507// ---------------------------------------------------------------------------
508
509#[cfg(all(test, not(loom)))]
510mod tests {
511 use super::*;
512 use crate::chan::{chan, Hchan};
513 use crate::runtime::sudog::Sudog;
514 use crate::runtime::sched::run_impl;
515 use std::ptr;
516 use std::sync::atomic::{AtomicI32, Ordering};
517 use std::sync::Arc;
518
519 // ─── vtable helpers for Hchan<i32> ────────────────────────────────────────
520
521 unsafe fn lock_i32(p: *const ()) {
522 (*(p as *const Hchan<i32>)).mutex.lock();
523 }
524 unsafe fn unlock_i32(p: *const ()) {
525 unsafe { (*(p as *const Hchan<i32>)).mutex.unlock() };
526 }
527
528 /// try_fn for a **send** case on `Hchan<i32>`.
529 ///
530 /// `elem` points to a `i32` to send. Checks recvq and buffer.
531 unsafe fn try_send_i32(p: *const (), elem: *mut u8) -> TryResult {
532 let hchan = &*(p as *const Hchan<i32>);
533 // SAFETY: caller holds the channel lock.
534 let state = &mut *hchan.state.get();
535
536 if state.closed {
537 return TryResult::ClosedSend;
538 }
539
540 // Waiting receiver?
541 let recv_sg = state.recvq.dequeue();
542 if !recv_sg.is_null() {
543 let gp = (*recv_sg).g;
544 let ep = (*recv_sg).elem as *mut Option<i32>;
545 if !ep.is_null() {
546 *ep = Some(ptr::read(elem as *const i32));
547 }
548 (*recv_sg).success = true;
549 (*gp).param = recv_sg as *mut u8;
550 return TryResult::Handoff { gp, ok: true };
551 }
552
553 // Buffer space?
554 if state.buf.len() < state.cap {
555 state.buf.push_back(ptr::read(elem as *const i32));
556 return TryResult::Done { ok: true };
557 }
558
559 TryResult::NotReady
560 }
561
562 /// try_fn for a **recv** case on `Hchan<i32>`.
563 ///
564 /// `elem` points to an `Option<i32>` output slot (initialised to `None`).
565 unsafe fn try_recv_i32(p: *const (), elem: *mut u8) -> TryResult {
566 let hchan = &*(p as *const Hchan<i32>);
567 let state = &mut *hchan.state.get();
568
569 // Waiting sender?
570 let send_sg = state.sendq.dequeue();
571 if !send_sg.is_null() {
572 let gp = (*send_sg).g;
573 let ep = (*send_sg).elem as *mut ManuallyDrop<i32>;
574 let boxed = (*send_sg).boxed_elem;
575 let val = if state.cap == 0 {
576 let v = ManuallyDrop::into_inner(ptr::read(ep));
577 if boxed { let _ = Box::from_raw(ep); }
578 (*send_sg).elem = ptr::null_mut();
579 v
580 } else {
581 let head = state.buf.pop_front().unwrap();
582 let sv = ManuallyDrop::into_inner(ptr::read(ep));
583 if boxed { let _ = Box::from_raw(ep); }
584 (*send_sg).elem = ptr::null_mut();
585 state.buf.push_back(sv);
586 head
587 };
588 *(elem as *mut Option<i32>) = Some(val);
589 (*send_sg).success = true;
590 (*gp).param = send_sg as *mut u8;
591 return TryResult::Handoff { gp, ok: true };
592 }
593
594 // Buffer has data?
595 if !state.buf.is_empty() {
596 let val = state.buf.pop_front().unwrap();
597 *(elem as *mut Option<i32>) = Some(val);
598 return TryResult::Done { ok: true };
599 }
600
601 // Closed and empty → elem stays None.
602 if state.closed {
603 return TryResult::Done { ok: false };
604 }
605
606 TryResult::NotReady
607 }
608
609 unsafe fn enqueue_send_i32(p: *const (), sg: *mut Sudog) {
610 let hchan = &*(p as *const Hchan<i32>);
611 (*hchan.state.get()).sendq.enqueue(sg);
612 }
613 unsafe fn enqueue_recv_i32(p: *const (), sg: *mut Sudog) {
614 let hchan = &*(p as *const Hchan<i32>);
615 (*hchan.state.get()).recvq.enqueue(sg);
616 }
617 unsafe fn dequeue_send_sg_i32(p: *const (), sg: *mut Sudog) {
618 let hchan = &*(p as *const Hchan<i32>);
619 (*hchan.state.get()).sendq.dequeue_sudog(sg);
620 }
621 unsafe fn dequeue_recv_sg_i32(p: *const (), sg: *mut Sudog) {
622 let hchan = &*(p as *const Hchan<i32>);
623 (*hchan.state.get()).recvq.dequeue_sudog(sg);
624 }
625
626 /// Build an `SCase` for a buffered-send of `val` on channel `h`.
627 fn send_case(h: &Arc<Hchan<i32>>, val: &mut i32) -> SCase {
628 SCase {
629 chan_ptr: Arc::as_ptr(h) as *const (),
630 sg: ptr::null_mut(),
631 elem: val as *mut i32 as *mut u8,
632 lock_fn: lock_i32,
633 unlock_fn: unlock_i32,
634 try_fn: try_send_i32,
635 enqueue_fn: enqueue_send_i32,
636 dequeue_fn: dequeue_send_sg_i32,
637 }
638 }
639
640 /// Build an `SCase` for a recv on channel `h`, output into `slot`.
641 fn recv_case(h: &Arc<Hchan<i32>>, slot: &mut Option<i32>) -> SCase {
642 SCase {
643 chan_ptr: Arc::as_ptr(h) as *const (),
644 sg: ptr::null_mut(),
645 elem: slot as *mut Option<i32> as *mut u8,
646 lock_fn: lock_i32,
647 unlock_fn: unlock_i32,
648 try_fn: try_recv_i32,
649 enqueue_fn: enqueue_recv_i32,
650 dequeue_fn: dequeue_recv_sg_i32,
651 }
652 }
653
654 // ── Fast-path tests (no goroutine park) ───────────────────────────────────
655
656 /// select { rx.recv() => ... ; default } on a buffered channel with data.
657 #[test]
658 fn fast_recv_buffered() {
659 run_impl(|| {
660 let (tx, rx) = chan::<i32>(4);
661 tx.send(42);
662
663 let mut slot: Option<i32> = None;
664 let mut cases = [recv_case(rx.hchan(), &mut slot)];
665 let (idx, ok) = selectgo(&mut cases, true);
666
667 assert_eq!(idx, 0, "should pick recv case");
668 assert!(ok, "should be ok (not closed)");
669 assert_eq!(slot.unwrap(), 42);
670 });
671 }
672
673 /// select { tx.send(v) => ... ; default } on a channel with buffer space.
674 #[test]
675 fn fast_send_buffered() {
676 run_impl(|| {
677 let (tx, rx) = chan::<i32>(4);
678
679 let mut val = 99_i32;
680 let mut cases = [send_case(tx.hchan(), &mut val)];
681 let (idx, ok) = selectgo(&mut cases, true);
682
683 assert_eq!(idx, 0);
684 assert!(ok, "buffered send completes with ok=true");
685 assert_eq!(rx.recv(), Some(99));
686 });
687 }
688
689 /// select { ... ; default } when no case is ready → default taken.
690 #[test]
691 fn default_taken_when_not_ready() {
692 run_impl(|| {
693 let (_tx, rx) = chan::<i32>(0);
694
695 let mut slot: Option<i32> = None;
696 let mut cases = [recv_case(rx.hchan(), &mut slot)];
697 let (idx, ok) = selectgo(&mut cases, true);
698
699 assert_eq!(idx, CASE_DEFAULT);
700 assert!(!ok);
701 });
702 }
703
704 /// select recv on closed+empty channel returns ok=false.
705 #[test]
706 fn recv_closed_empty() {
707 run_impl(|| {
708 let (tx, rx) = chan::<i32>(0);
709 tx.close();
710
711 let mut slot: Option<i32> = None;
712 let mut cases = [recv_case(rx.hchan(), &mut slot)];
713 let (idx, ok) = selectgo(&mut cases, false);
714
715 assert_eq!(idx, 0);
716 assert!(!ok, "recv from closed returns ok=false");
717 assert!(slot.is_none(), "closed recv slot must stay None");
718 });
719 }
720
721 // ── Multi-case selection ──────────────────────────────────────────────────
722
723 /// Two recv cases; only one channel has data — that case wins.
724 #[test]
725 fn multi_case_first_ready_wins() {
726 run_impl(|| {
727 let (tx1, rx1) = chan::<i32>(1);
728 let (_tx2, rx2) = chan::<i32>(1);
729
730 tx1.send(7);
731
732 let mut s1: Option<i32> = None;
733 let mut s2: Option<i32> = None;
734 let mut cases = [
735 recv_case(rx1.hchan(), &mut s1),
736 recv_case(rx2.hchan(), &mut s2),
737 ];
738 let (idx, ok) = selectgo(&mut cases, false);
739
740 assert_eq!(idx, 0);
741 assert!(ok);
742 assert_eq!(s1.unwrap(), 7);
743 });
744 }
745
746 // ── Blocking path tests (goroutine park/unpark) ───────────────────────────
747
748 /// Goroutine blocks on select recv, then a sender unblocks it.
749 #[test]
750 fn blocking_recv_unblocked_by_send() {
751 use crate::runtime::sched::spawn_goroutine;
752
753 let result = Arc::new(AtomicI32::new(-1));
754 let result2 = Arc::clone(&result);
755
756 run_impl(move || {
757 let (tx, rx) = chan::<i32>(0);
758
759 spawn_goroutine(move || {
760 // Sender: wait a bit, then send.
761 crate::gosched();
762 tx.send(55);
763 });
764
765 let mut slot: Option<i32> = None;
766 let mut cases = [recv_case(rx.hchan(), &mut slot)];
767 // No default → will block.
768 let (idx, ok) = selectgo(&mut cases, false);
769
770 assert_eq!(idx, 0);
771 assert!(ok);
772 result2.store(slot.unwrap(), Ordering::Relaxed);
773 });
774
775 assert_eq!(result.load(Ordering::Acquire), 55);
776 }
777
778 /// Goroutine blocks on select send, then a receiver unblocks it.
779 #[test]
780 fn blocking_send_unblocked_by_recv() {
781 use crate::runtime::sched::spawn_goroutine;
782
783 run_impl(|| {
784 let (tx, rx) = chan::<i32>(0);
785
786 spawn_goroutine(move || {
787 crate::gosched();
788 // Consume the value the select sends.
789 let _ = rx.recv();
790 });
791
792 let mut val = 77_i32;
793 let mut cases = [send_case(tx.hchan(), &mut val)];
794 let (idx, _ok) = selectgo(&mut cases, false);
795
796 assert_eq!(idx, 0);
797 });
798 }
799
800 /// Two goroutines racing on the same channel; exactly one wins via select.
801 #[test]
802 fn select_race_one_winner() {
803 use crate::runtime::sched::spawn_goroutine;
804
805 let wins = Arc::new(AtomicI32::new(0));
806 let wins2 = Arc::clone(&wins);
807 let wins3 = Arc::clone(&wins);
808
809 run_impl(move || {
810 let (tx, rx) = chan::<i32>(1);
811 tx.send(1); // one value in the buffer
812
813 spawn_goroutine({
814 let wins = Arc::clone(&wins2);
815 let rx = rx.clone();
816 move || {
817 let mut slot: Option<i32> = None;
818 let mut cases = [recv_case(rx.hchan(), &mut slot)];
819 let (idx, ok) = selectgo(&mut cases, true);
820 if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
821 }
822 });
823
824 spawn_goroutine({
825 let wins = Arc::clone(&wins3);
826 let rx = rx.clone();
827 move || {
828 let mut slot: Option<i32> = None;
829 let mut cases = [recv_case(rx.hchan(), &mut slot)];
830 let (idx, ok) = selectgo(&mut cases, true);
831 if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
832 }
833 });
834
835 // Give goroutines time to race.
836 for _ in 0..200 { crate::gosched(); }
837 });
838
839 // Exactly one goroutine should have received the value.
840 assert_eq!(wins.load(Ordering::Acquire), 1);
841 }
842}