Skip to main content

go_lib/
chan.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Channels — ported from `src/runtime/chan.go`.
3//!
4//! Buffered and unbuffered channels backed by the same G/M/P scheduler used by
5//! goroutines.  A goroutine that blocks on a channel send or receive is parked
6//! via `gopark` and resumed via `goready`; no OS thread is ever blocked.
7//!
8//! ## Public surface
9//!
10//! ```no_run
11//! let (tx, rx) = go_lib::chan::chan::<i32>(0);   // unbuffered
12//! let (tx, rx) = go_lib::chan::chan::<i32>(16);  // buffered, capacity 16
13//!
14//! tx.send(42_i32);
15//! let v = rx.recv();  // Some(42); None means closed + empty
16//! tx.close();
17//! ```
18//!
19//! ## Internals
20//!
21//! Each channel is an `Arc<Hchan<T>>`.  The lock-protected interior holds a
22//! `VecDeque<T>` ring buffer plus two wait queues (`sendq` / `recvq`) of
23//! `Sudog` records (one per blocked goroutine).
24//!
25//! Locking uses `RawMutex` (not `std::sync::Mutex`) so that `selectgo` can
26//! hold multiple heterogeneous channel locks simultaneously without needing a
27//! typed `MutexGuard<HchanState<T>>` for each one.
28//!
29//! ### Blocking protocol
30//!
31//! When a goroutine must block it:
32//! 1. Allocates a `Sudog` from the pool.
33//! 2. Heap-allocates a `ManuallyDrop<T>` (send) or `Option<T>` (recv) as the
34//!    value staging area (`sudog.elem`).
35//! 3. Enqueues the sudog in `sendq` / `recvq` (under the channel lock).
36//! 4. Releases the lock.
37//! 5. Calls `gopark` — `park_fn` sets `GWAITING` on g0's stack.
38//!
39//! The goroutine that completes the operation:
40//! - Reads or writes through `sudog.elem`.
41//! - Sets `sudog.success` and `(*gp).param = sudog as *mut u8`.
42//! - Calls `goready`, which spins until the target is `GWAITING` before
43//!   marking it `GRUNNABLE`.
44//!
45//! ### Close semantics (matches Go)
46//!
47//! - Sending on a closed channel **panics**.
48//! - Receiving from a closed empty channel returns `None`.
49//! - Closing an already-closed channel **panics**.
50//!
51//! Ported from `hchan`, `chansend`, `chanrecv`, `closechan` in
52//! `runtime/chan.go`.
53
54use std::cell::UnsafeCell;
55use std::collections::VecDeque;
56use std::mem::ManuallyDrop;
57use std::ptr;
58use std::sync::Arc;
59
60use crate::runtime::g::{current_g, WaitReason};
61use crate::runtime::park::{gopark_commit, goready};
62use crate::runtime::rawmutex::{LockGuard, RawMutex};
63use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog, WaitQ};
64
65// ---------------------------------------------------------------------------
66// Hchan — the heap channel object
67// ---------------------------------------------------------------------------
68
69/// Lock-protected interior of a channel.
70pub(crate) struct HchanState<T> {
71    /// Buffered elements waiting to be received (FIFO).
72    pub(crate) buf:    VecDeque<T>,
73    /// Buffer capacity (0 = unbuffered / synchronous).
74    pub(crate) cap:    usize,
75    /// True after `close()`.
76    pub(crate) closed: bool,
77    /// Goroutines blocked in `send` (buffer full or unbuffered with no receiver).
78    pub(crate) sendq:  WaitQ,
79    /// Goroutines blocked in `recv` (buffer empty or unbuffered with no sender).
80    pub(crate) recvq:  WaitQ,
81}
82
83impl<T> HchanState<T> {
84    fn new(cap: usize) -> Self {
85        Self {
86            buf:    VecDeque::with_capacity(cap),
87            cap,
88            closed: false,
89            sendq:  WaitQ::new(),
90            recvq:  WaitQ::new(),
91        }
92    }
93}
94
95/// The channel heap object, shared via `Arc` between all `Sender`/`Receiver`
96/// clones.
97///
98/// `pub(crate)` so that `selectgo` (step 14) can access the state directly.
99///
100/// The `mutex` field is first so that `Arc::as_ptr(h) as *const RawMutex` gives
101/// a stable address suitable for address-ordered lock acquisition in `selectgo`.
102///
103/// `#[repr(C)]` is **load-bearing**: `selectgo` locks a channel via
104/// `Arc::as_ptr(h) as *const RawMutex`, i.e. it assumes `mutex` lives at
105/// offset 0.  Without `repr(C)`, Rust's default layout reorders fields by
106/// alignment and puts the (8-aligned) `state` at offset 0 and the 1-byte
107/// `mutex` at the end — so `selectgo` would spin on bytes inside `state.buf`
108/// instead of the real lock, never excluding live channel operations and
109/// corrupting the wait queues (observed as SIGSEGV/SIGBUS and lost-wakeup
110/// hangs under select stress).
111#[repr(C)]
112pub(crate) struct Hchan<T> {
113    /// Raw adaptive spinlock protecting `state`.
114    ///
115    /// Exposed `pub(crate)` so `selectgo` can lock/unlock multiple heterogeneous
116    /// channels without needing typed `MutexGuard` storage.
117    pub(crate) mutex: RawMutex,
118    /// Interior state — always accessed under `mutex`.
119    pub(crate) state: UnsafeCell<HchanState<T>>,
120}
121
122unsafe impl<T: Send> Send for Hchan<T> {}
123unsafe impl<T: Send> Sync for Hchan<T> {}
124
125impl<T> Hchan<T> {
126    pub(crate) fn new(cap: usize) -> Self {
127        Self {
128            mutex: RawMutex::new(),
129            state: UnsafeCell::new(HchanState::new(cap)),
130        }
131    }
132
133    /// Acquire the lock and return a guard + mutable state reference.
134    ///
135    /// The guard releases the lock when dropped.  Drop it *before* calling
136    /// `gopark` so the scheduler can't see the lock still held.
137    ///
138    /// # Safety
139    /// The returned `&mut HchanState<T>` must not be used after the guard is
140    /// dropped (the lock no longer protects access).
141    #[allow(clippy::mut_from_ref)] // intentional: state is behind UnsafeCell
142    pub(crate) unsafe fn lock_state(&self) -> (LockGuard<'_>, &mut HchanState<T>) {
143        let g = LockGuard::new(&self.mutex);
144        // SAFETY: We just acquired the lock; no other thread holds a reference.
145        let s = unsafe { &mut *self.state.get() };
146        (g, s)
147    }
148}
149
150// ---------------------------------------------------------------------------
151// Public channel halves
152// ---------------------------------------------------------------------------
153
154/// The sending half of a channel.  Cheap to `clone`.
155pub struct Sender<T>(Arc<Hchan<T>>);
156
157/// The receiving half of a channel.  Cheap to `clone`.
158pub struct Receiver<T>(Arc<Hchan<T>>);
159
160impl<T> Clone for Sender<T>   { fn clone(&self) -> Self { Sender(Arc::clone(&self.0))   } }
161impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { Receiver(Arc::clone(&self.0)) } }
162
163unsafe impl<T: Send> Send for Sender<T>   {}
164unsafe impl<T: Send> Sync for Sender<T>   {}
165unsafe impl<T: Send> Send for Receiver<T> {}
166unsafe impl<T: Send> Sync for Receiver<T> {}
167
168/// Create a new channel with the given buffer capacity.
169///
170/// `cap == 0` gives an unbuffered (synchronous rendezvous) channel; `cap > 0`
171/// gives a buffered channel that holds up to `cap` values without blocking the
172/// sender.
173///
174/// Returns `(Sender<T>, Receiver<T>)`.
175pub fn chan<T: Send + 'static>(cap: usize) -> (Sender<T>, Receiver<T>) {
176    let h = Arc::new(Hchan::new(cap));
177    (Sender(Arc::clone(&h)), Receiver(h))
178}
179
180impl<T: Send + 'static> Sender<T> {
181    /// Send `val`, blocking until a receiver is ready or buffer space opens.
182    ///
183    /// # Panics
184    /// Panics if the channel has been closed.
185    pub fn send(&self, val: T) {
186        unsafe { chansend(&self.0, val, true) };
187    }
188
189    /// Non-blocking send.  Returns `false` if the buffer is full or there is
190    /// no waiting receiver.  Panics if the channel is closed.
191    pub fn try_send(&self, val: T) -> bool {
192        unsafe { chansend(&self.0, val, false) }
193    }
194
195    /// Close the channel.  Panics if already closed.
196    pub fn close(&self) {
197        unsafe { closechan(&self.0) };
198    }
199
200    /// Raw access to the underlying `Hchan` for use by `selectgo`.
201    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
202}
203
204impl<T: Send + 'static> Receiver<T> {
205    /// Receive a value, blocking until one is available or the channel closes.
206    ///
207    /// Returns `Some(val)` on success, `None` if the channel is closed and
208    /// the buffer is fully drained.
209    pub fn recv(&self) -> Option<T> {
210        unsafe { chanrecv(&self.0, true) }
211    }
212
213    /// Non-blocking receive.
214    ///
215    /// - `Some(Some(val))` — received.
216    /// - `Some(None)`      — channel closed and empty.
217    /// - `None`            — would block (nothing ready yet).
218    pub fn try_recv(&self) -> Option<Option<T>> {
219        unsafe { chanrecv_nb(&self.0) }
220    }
221
222    /// Raw access to the underlying `Hchan` for use by `selectgo`.
223    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
224}
225
226// ---------------------------------------------------------------------------
227// chansend
228// ---------------------------------------------------------------------------
229
230/// Send `val` to `c`.
231///
232/// `block = true`  → park the goroutine if the channel has no space.
233/// `block = false` → return `false` immediately if the channel has no space.
234///
235/// # Safety
236/// Must be called from a goroutine (not g0 or an OS-thread main function).
237///
238/// Ported from `chansend` in `runtime/chan.go`.
239pub(crate) unsafe fn chansend<T: Send + 'static>(
240    c:     &Arc<Hchan<T>>,
241    val:   T,
242    block: bool,
243) -> bool {
244    // SAFETY: we hold the lock for the duration of the guard's scope.
245    let (_g, state) = unsafe { c.lock_state() };
246
247    if state.closed {
248        drop(_g);
249        panic!("send on closed channel");
250    }
251
252    // ── Case 1: direct handoff to a waiting receiver ─────────────────────────
253    let recv_sg = unsafe { state.recvq.dequeue() };
254    if !recv_sg.is_null() {
255        // We hold the channel lock across the dequeue, the `*elem_ptr` write
256        // (which, for a select receiver, lands in that goroutine's `selectgo`
257        // stack frame), and `goready`.  The receiver cannot resume and unwind
258        // that frame until it dequeues its own sudog, which needs this same
259        // channel lock — so the stack we write into stays valid for the write.
260        // The G descriptor itself is reused via the gFree pool (never freed
261        // while parked), so the `(*gp)` dereferences below are always safe.
262        let gp       = unsafe { (*recv_sg).g };
263        // recv_sg.elem points to Option<T> (allocated by chanrecv or selectgo).
264        let elem_ptr = unsafe { (*recv_sg).elem as *mut Option<T> };
265        if !elem_ptr.is_null() {
266            unsafe { *elem_ptr = Some(val) };
267        }
268        unsafe {
269            (*recv_sg).success = true;
270            (*gp).param        = recv_sg as *mut u8;
271        }
272        drop(_g);
273        unsafe { goready(gp) };
274        return true;
275    }
276
277    // ── Case 2: buffer has space ──────────────────────────────────────────────
278    if state.buf.len() < state.cap {
279        state.buf.push_back(val);
280        return true;
281    }
282
283    // ── Case 3: non-blocking — cannot proceed ────────────────────────────────
284    if !block {
285        return false;
286    }
287
288    // ── Case 4: block — enqueue this goroutine as a waiting sender ───────────
289    let gp = current_g();
290    debug_assert!(!gp.is_null(), "chansend: called from g0");
291
292    // Box<ManuallyDrop<T>> — the receiver moves the value out and frees the box.
293    let elem_ptr = Box::into_raw(Box::new(ManuallyDrop::new(val))) as *mut u8;
294
295    let s = acquire_sudog();
296    unsafe {
297        (*s).g          = gp;
298        (*s).elem       = elem_ptr;
299        (*s).boxed_elem = true; // Box<ManuallyDrop<T>> — must be freed by receiver
300        (*s).success    = false;
301        (*s).c          = Arc::as_ptr(c) as *mut u8;
302        (*gp).param     = ptr::null_mut();
303        state.sendq.enqueue(s);
304    }
305
306    // Commit-park protocol: keep the channel lock held until park_fn has
307    // transitioned this goroutine to GWAITING (released on g0).  Releasing
308    // before the park opens a window where a receiver consumes the sudog and
309    // calls goready while we are still GRUNNING/GRUNNABLE — the wake is then
310    // lost and this goroutine parks forever (observed under SIGURG storms).
311    let mutex_ptr = _g.into_locked_raw();
312    unsafe { gopark_commit(WaitReason::ChanSend, unlock_chan_mutex, mutex_ptr as *mut u8) };
313
314    // ── Resumed: inspect outcome ─────────────────────────────────────────────
315    let ok = unsafe {
316        let s2 = (*gp).param as *mut Sudog;
317        (*gp).param = ptr::null_mut();
318        let ok = (*s2).success;
319
320        if !ok && !(*s2).elem.is_null() {
321            // Send was rejected (channel closed after we parked).
322            // elem points to ManuallyDrop<T> — drop the value, then free the box.
323            let ep = (*s2).elem as *mut ManuallyDrop<T>;
324            (*s2).elem = ptr::null_mut();
325            ManuallyDrop::drop(&mut *ep); // run T's destructor
326            if (*s2).boxed_elem { let _ = Box::from_raw(ep); }
327        }
328        (*s2).g = ptr::null_mut();
329        (*s2).c = ptr::null_mut();
330        release_sudog(s2);
331        ok
332    };
333
334    if !ok {
335        panic!("send on closed channel");
336    }
337    true
338}
339
340// ---------------------------------------------------------------------------
341// chanrecv
342// ---------------------------------------------------------------------------
343
344/// Receive from `c`.
345///
346/// `block = true`  → park until a value or close.
347/// `block = false` → return `None` immediately if nothing is ready.
348///
349/// Returns `Some(val)` on success or `None` for closed-and-empty / would-block.
350///
351/// # Safety
352/// Must be called from a goroutine (not g0 or an OS-thread main function).
353///
354/// Ported from `chanrecv` in `runtime/chan.go`.
355pub(crate) unsafe fn chanrecv<T: Send + 'static>(
356    c:     &Arc<Hchan<T>>,
357    block: bool,
358) -> Option<T> {
359    let (_g, state) = unsafe { c.lock_state() };
360
361    // ── Case 1: direct handoff from a waiting sender ─────────────────────────
362    let send_sg = unsafe { state.sendq.dequeue() };
363    if !send_sg.is_null() {
364        let val = recv_from_sender(state, send_sg);
365        drop(_g);
366        return Some(val);
367    }
368
369    // ── Case 2: buffer has data ───────────────────────────────────────────────
370    if !state.buf.is_empty() {
371        return Some(state.buf.pop_front().unwrap());
372    }
373
374    // ── Case 3: closed and empty ──────────────────────────────────────────────
375    if state.closed {
376        return None;
377    }
378
379    // ── Case 4: non-blocking — nothing ready ─────────────────────────────────
380    if !block {
381        return None;
382    }
383
384    // ── Case 5: block — enqueue as a waiting receiver ────────────────────────
385    let gp = current_g();
386    debug_assert!(!gp.is_null(), "chanrecv: called from g0");
387
388    // Box<Option<T>> — the sender writes Some(val) into this slot, or it
389    // stays None if the channel closes.  Freed on wakeup.
390    let elem_ptr = Box::into_raw(Box::new(None::<T>)) as *mut u8;
391
392    let s = acquire_sudog();
393    unsafe {
394        (*s).g          = gp;
395        (*s).elem       = elem_ptr;
396        (*s).boxed_elem = true; // Box<Option<T>> — must be freed on wakeup
397        (*s).success    = false;
398        (*s).c          = Arc::as_ptr(c) as *mut u8;
399        (*gp).param     = ptr::null_mut();
400        state.recvq.enqueue(s);
401    }
402
403    // Commit-park protocol — see chansend for the lost-wakeup rationale.
404    let mutex_ptr = _g.into_locked_raw();
405    unsafe { gopark_commit(WaitReason::ChanReceive, unlock_chan_mutex, mutex_ptr as *mut u8) };
406
407    // ── Resumed: read outcome ─────────────────────────────────────────────────
408    unsafe {
409        let s2 = (*gp).param as *mut Sudog;
410        (*gp).param = ptr::null_mut();
411        let ok = (*s2).success;
412
413        let boxed = (*s2).boxed_elem;
414        // elem points to Option<T>: Some(val) if sender delivered, None if closed.
415        let result = if ok {
416            debug_assert!(!(*s2).elem.is_null(), "chanrecv: success but elem is null");
417            let ep = (*s2).elem as *mut Option<T>;
418            (*s2).elem = ptr::null_mut();
419            // ptr::read bitwise-copies the Option<T> out of *ep.  The allocation
420            // at `ep` still holds the original bytes, but ownership of T has been
421            // transferred to `val` — running Option<T>'s destructor on *ep would
422            // double-drop T.  Cast to ManuallyDrop<Option<T>> to free the Box
423            // allocation without triggering Option<T>::drop().
424            let val = ptr::read(ep).expect("chanrecv: elem was None on success");
425            if boxed {
426                let _ = Box::from_raw(ep as *mut std::mem::ManuallyDrop<Option<T>>);
427            }
428            Some(val)
429        } else {
430            if !(*s2).elem.is_null() {
431                // Channel was closed — the slot contains None (T was never written).
432                // Dropping None does not touch any T; no ManuallyDrop needed.
433                let ep = (*s2).elem as *mut Option<T>;
434                (*s2).elem = ptr::null_mut();
435                if boxed { let _ = Box::from_raw(ep); }
436            }
437            None
438        };
439
440        (*s2).g = ptr::null_mut();
441        (*s2).c = ptr::null_mut();
442        release_sudog(s2);
443        result
444    }
445}
446
447/// Non-blocking receive.
448///
449/// Returns:
450/// - `Some(Some(v))` — value received.
451/// - `Some(None)`    — channel closed and empty.
452/// - `None`          — would block (channel has nothing ready right now).
453///
454/// # Safety
455/// May be called outside the scheduler as long as the blocking path is never
456/// triggered.
457pub(crate) unsafe fn chanrecv_nb<T: Send + 'static>(
458    c: &Arc<Hchan<T>>,
459) -> Option<Option<T>> {
460    let (_g, state) = unsafe { c.lock_state() };
461
462    let send_sg = unsafe { state.sendq.dequeue() };
463    if !send_sg.is_null() {
464        let val = recv_from_sender(state, send_sg);
465        drop(_g);
466        return Some(Some(val));
467    }
468
469    if !state.buf.is_empty() {
470        return Some(Some(state.buf.pop_front().unwrap()));
471    }
472
473    if state.closed {
474        return Some(None);
475    }
476
477    None
478}
479
480/// `gopark_commit` unlock shim: release a channel's `RawMutex` from g0 after
481/// the parking goroutine has reached `GWAITING`.
482///
483/// # Safety
484/// `arg` must be the channel's `&RawMutex` (as produced by
485/// `LockGuard::into_locked_raw`), currently held by the parking goroutine.
486unsafe fn unlock_chan_mutex(arg: *mut u8) {
487    unsafe { (*(arg as *const RawMutex)).unlock() }
488}
489
490/// Receive from a **dequeued** sender sudog and wake the sender.
491///
492/// For unbuffered channels (`cap == 0`): value is moved directly from the
493/// sender's staging box.
494/// For buffered channels (always full when a sender is queued): take the head
495/// of the buffer, rotate the sender's value into the tail.
496///
497/// **Caller must release the channel lock after this returns**, before the
498/// woken goroutine can be scheduled.
499///
500/// Ported from `recv` in `runtime/chan.go`.
501fn recv_from_sender<T: Send + 'static>(
502    state:   &mut HchanState<T>,
503    send_sg: *mut Sudog,
504) -> T {
505    // The caller holds the channel lock across the dequeue of `send_sg`, the
506    // `ptr::read` of `send_sg.elem` below (which, for a select sender, reads
507    // out of that goroutine's `selectgo` stack frame), and `goready`.  The
508    // sender cannot resume and unwind that frame until it dequeues its own
509    // sudog, which needs this same channel lock — so the sender's stack stays
510    // valid for this read.  The G descriptor is reused via the gFree pool
511    // (never freed while parked), so `(*gp)` is safe.
512    let gp = unsafe { (*send_sg).g };
513
514    let boxed = unsafe { (*send_sg).boxed_elem };
515
516    // send_sg.elem points to ManuallyDrop<T> (both chansend and selectgo use
517    // ManuallyDrop semantics — same memory layout as T, no destructor).
518    let val = if state.cap == 0 {
519        let ep = unsafe { (*send_sg).elem as *mut ManuallyDrop<T> };
520        let v  = unsafe { ManuallyDrop::into_inner(ptr::read(ep)) };
521        unsafe {
522            if boxed { let _ = Box::from_raw(ep); }
523            (*send_sg).elem = ptr::null_mut();
524        }
525        v
526    } else {
527        let head = state.buf.pop_front().unwrap();
528        let ep   = unsafe { (*send_sg).elem as *mut ManuallyDrop<T> };
529        let sv   = unsafe { ManuallyDrop::into_inner(ptr::read(ep)) };
530        unsafe {
531            if boxed { let _ = Box::from_raw(ep); }
532            (*send_sg).elem = ptr::null_mut();
533        }
534        state.buf.push_back(sv);
535        head
536    };
537
538    unsafe {
539        (*send_sg).success = true;
540        (*gp).param        = send_sg as *mut u8;
541    }
542    unsafe { goready(gp) };
543    val
544}
545
546// ---------------------------------------------------------------------------
547// closechan
548// ---------------------------------------------------------------------------
549
550/// Close `c`.
551///
552/// Marks the channel closed, drains all waiting receivers (they get `None`)
553/// and senders (they panic), and wakes all of them.
554///
555/// # Panics
556/// Panics if the channel is already closed.
557///
558/// # Safety
559/// Must be called from a goroutine (not g0 / OS-thread main).
560///
561/// Ported from `closechan` in `runtime/chan.go`.
562pub(crate) unsafe fn closechan<T: Send + 'static>(c: &Arc<Hchan<T>>) {
563    let (_g, state) = unsafe { c.lock_state() };
564
565    if state.closed {
566        drop(_g);
567        panic!("close of closed channel");
568    }
569    state.closed = true;
570
571    // `closechan` never touches any waiter's stack: it only writes `success`
572    // and `param`, both fields of the `Sudog`/`G` descriptors, and then
573    // `goready`s each waiter (which dereferences only the `G`).  Those
574    // descriptors stay live while the waiter is parked, so there is no
575    // stack-reclamation hazard here.
576    let mut wakeup: Vec<*mut crate::runtime::g::G> = Vec::new();
577
578    loop {
579        let sg = unsafe { state.recvq.dequeue() };
580        if sg.is_null() { break; }
581        let gp = unsafe { (*sg).g };
582        unsafe {
583            (*sg).success = false;
584            (*gp).param   = sg as *mut u8;
585        }
586        wakeup.push(gp);
587    }
588
589    loop {
590        let sg = unsafe { state.sendq.dequeue() };
591        if sg.is_null() { break; }
592        let gp = unsafe { (*sg).g };
593        unsafe {
594            (*sg).success = false;
595            (*gp).param   = sg as *mut u8;
596        }
597        wakeup.push(gp);
598    }
599
600    drop(_g);
601
602    for gp in wakeup {
603        unsafe { goready(gp) };
604    }
605}
606
607// ---------------------------------------------------------------------------
608// Tests
609// ---------------------------------------------------------------------------
610
611#[cfg(all(test, not(loom)))]
612mod tests {
613    use super::*;
614    use crate::runtime::sched::run_impl;
615    use std::sync::atomic::{AtomicI32, Ordering};
616    use std::sync::Arc;
617
618    // ── Buffered: fast paths (no goroutine park) ──────────────────────────────
619
620    /// Single send + recv completes without blocking.
621    #[test]
622    fn buffered_send_recv() {
623        run_impl(|| {
624            let (tx, rx) = chan::<i32>(1);
625            tx.send(42);
626            assert_eq!(rx.recv(), Some(42));
627        });
628    }
629
630    /// Values arrive in FIFO order.
631    #[test]
632    fn buffered_fifo_order() {
633        run_impl(|| {
634            let (tx, rx) = chan::<i32>(4);
635            for i in 0..4_i32 { tx.send(i); }
636            for i in 0..4_i32 { assert_eq!(rx.recv(), Some(i)); }
637        });
638    }
639
640    /// Close drains buffered values, then recv returns None.
641    #[test]
642    fn buffered_close_drains_then_none() {
643        run_impl(|| {
644            let (tx, rx) = chan::<i32>(2);
645            tx.send(1);
646            tx.send(2);
647            tx.close();
648            assert_eq!(rx.recv(), Some(1));
649            assert_eq!(rx.recv(), Some(2));
650            assert_eq!(rx.recv(), None);
651            assert_eq!(rx.recv(), None); // idempotent
652        });
653    }
654
655    // ── Non-blocking ops ──────────────────────────────────────────────────────
656
657    /// try_recv on an empty open channel returns None (would block).
658    #[test]
659    fn try_recv_empty() {
660        run_impl(|| {
661            let (_tx, rx) = chan::<i32>(4);
662            assert_eq!(rx.try_recv(), None);
663        });
664    }
665
666    /// try_recv on a closed empty channel returns Some(None).
667    #[test]
668    fn try_recv_closed_empty() {
669        run_impl(|| {
670            let (tx, rx) = chan::<i32>(4);
671            tx.close();
672            assert_eq!(rx.try_recv(), Some(None));
673        });
674    }
675
676    /// try_send to a full channel returns false.
677    #[test]
678    fn try_send_full() {
679        run_impl(|| {
680            let (tx, _rx) = chan::<i32>(2);
681            assert!(tx.try_send(1));
682            assert!(tx.try_send(2));
683            assert!(!tx.try_send(3));
684        });
685    }
686
687    // ── Panic paths ───────────────────────────────────────────────────────────
688    //
689    // These don't exercise goroutine parking — the panic must unwind back to
690    // the test thread's #[should_panic] handler, so we must NOT wrap in run_impl.
691
692    /// Closing an already-closed channel panics.
693    #[test]
694    #[should_panic(expected = "close of closed channel")]
695    fn close_twice_panics() {
696        let (tx, _rx) = chan::<i32>(1);
697        tx.close();
698        tx.close();
699    }
700
701    /// Sending on a closed channel panics.
702    #[test]
703    #[should_panic(expected = "send on closed channel")]
704    fn send_on_closed_panics() {
705        let (tx, _rx) = chan::<i32>(1);
706        tx.close();
707        tx.send(1);
708    }
709
710    // ── Goroutine rendezvous (exercises park/unpark) ──────────────────────────
711
712    /// Unbuffered send and recv across two goroutines.
713    #[test]
714    fn unbuffered_rendezvous() {
715        use crate::runtime::sched::spawn_goroutine;
716
717        run_impl(|| {
718            let (tx, rx) = chan::<i32>(0);
719            spawn_goroutine(move || { tx.send(99); });
720            assert_eq!(rx.recv(), Some(99));
721        });
722    }
723
724    /// Ping-pong ten rounds across two goroutines.
725    #[test]
726    fn unbuffered_ping_pong() {
727        use crate::runtime::sched::spawn_goroutine;
728
729        run_impl(|| {
730            let (ping_tx, ping_rx) = chan::<i32>(0);
731            let (pong_tx, pong_rx) = chan::<i32>(0);
732
733            spawn_goroutine(move || {
734                for _ in 0..10 {
735                    let v = ping_rx.recv().unwrap();
736                    pong_tx.send(v + 1);
737                }
738            });
739
740            let mut n = 0_i32;
741            for _ in 0..10 {
742                ping_tx.send(n);
743                n = pong_rx.recv().unwrap();
744            }
745            assert_eq!(n, 10);
746        });
747    }
748
749    /// Buffered producer/consumer: 20 values summed by a goroutine.
750    #[test]
751    fn producer_consumer() {
752        use crate::runtime::sched::spawn_goroutine;
753
754        const N: i32 = 20;
755        let sum = Arc::new(AtomicI32::new(0));
756        let sum2 = Arc::clone(&sum);
757
758        run_impl(move || {
759            let (tx, rx) = chan::<i32>(4);
760            let sum3 = Arc::clone(&sum2);
761
762            spawn_goroutine(move || {
763                for i in 0..N { tx.send(i); }
764                tx.close();
765            });
766
767            spawn_goroutine(move || {
768                while let Some(v) = rx.recv() {
769                    sum3.fetch_add(v, Ordering::Relaxed);
770                }
771            });
772
773            // A wall-clock deadline is robust across CI runner speeds and
774            // build profiles (debug, coverage/nightly) where frame sizes and
775            // instrumentation overhead can make goroutines run much slower.
776            let expected = N * (N - 1) / 2;
777            let deadline =
778                std::time::Instant::now() + std::time::Duration::from_secs(5);
779            while sum2.load(Ordering::Acquire) != expected
780                && std::time::Instant::now() < deadline
781            {
782                crate::gosched();
783            }
784        });
785
786        assert_eq!(sum.load(Ordering::Acquire), N * (N - 1) / 2);
787    }
788    
789    /// multi-producer, single consumer: 10 producers summing 10 values
790    #[test]
791    fn multi_producer_single_consumer() {
792        use crate::runtime::sched::spawn_goroutine;
793        use crate::sync::WaitGroup;
794        
795        const N: i32 = 10;
796        let total = N*N;
797        let sum = Arc::new(AtomicI32::new(0));
798        let sum2 = Arc::clone(&sum);
799
800        run_impl(move || {
801            let (tx, rx) = chan::<i32>(4);
802            let sum3 = Arc::clone(&sum2);
803            let wg  = Arc::new(WaitGroup::new());
804
805            for g in 0 .. N {
806                let start = N*g;
807                let end = start+N;
808                let tx_clone = tx.clone();
809                let wg_clone = Arc::clone(&wg);
810                // add(1) must happen before spawn_goroutine so that wg.wait()
811                // cannot return before all producers have registered themselves.
812                wg_clone.add(1);
813                spawn_goroutine(move || {
814                    for i in start..end { tx_clone.send(i); }
815                    wg_clone.done()
816                });
817            }
818            
819            spawn_goroutine(move || {
820                while let Some(v) = rx.recv() {
821                    sum3.fetch_add(v, Ordering::Relaxed);
822                }
823            });
824
825            wg.wait();
826            tx.close();
827            
828            // A wall-clock deadline is robust across CI runner speeds and
829            // build profiles (debug, coverage/nightly) where frame sizes and
830            // instrumentation overhead can make goroutines run much slower.
831            let expected = total * (total - 1) / 2;
832            let deadline =
833                std::time::Instant::now() + std::time::Duration::from_secs(5);
834            while sum2.load(Ordering::Acquire) != expected
835                && std::time::Instant::now() < deadline
836            {
837                crate::gosched();
838            }
839        });
840
841        assert_eq!(sum.load(Ordering::Acquire), total * (total - 1) / 2);
842
843    }
844
845    /// Close wakes a goroutine that is blocked on recv.
846    #[test]
847    fn close_wakes_blocked_receiver() {
848        use crate::runtime::sched::spawn_goroutine;
849
850        let got_none = Arc::new(AtomicI32::new(0));
851        let got2 = Arc::clone(&got_none);
852        let got3 = Arc::clone(&got_none); // checked inside run_impl to bound the post-close loop
853
854        run_impl(move || {
855            let (tx, rx) = chan::<i32>(0);
856
857            spawn_goroutine(move || {
858                // Block on recv until the channel is closed.
859                if rx.recv().is_none() {
860                    got2.fetch_add(1, Ordering::Relaxed);
861                }
862            });
863
864            // Give the spawned goroutine time to start and block on recv.
865            // More iterations than before because parallel test load on CI can
866            // starve goroutines for many scheduler rounds.
867            for _ in 0..500 { crate::gosched(); }
868            tx.close();
869            // Wait for the goroutine to observe the close and record its result.
870            // A wall-clock deadline is robust across CI runner speeds.
871            let deadline =
872                std::time::Instant::now() + std::time::Duration::from_secs(5);
873            while got3.load(Ordering::Acquire) == 0
874                && std::time::Instant::now() < deadline
875            {
876                crate::gosched();
877            }
878        });
879
880        assert_eq!(got_none.load(Ordering::Acquire), 1);
881    }
882}