Skip to main content

go_lib/
chan.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Channels — ported from `src/runtime/chan.go`.
3//!
4//! Buffered and unbuffered channels backed by the same G/M/P scheduler used by
5//! goroutines.  A goroutine that blocks on a channel send or receive is parked
6//! via `gopark` and resumed via `goready`; no OS thread is ever blocked.
7//!
8//! ## Public surface
9//!
10//! ```no_run
11//! let (tx, rx) = go_lib::chan::chan::<i32>(0);   // unbuffered
12//! let (tx, rx) = go_lib::chan::chan::<i32>(16);  // buffered, capacity 16
13//!
14//! tx.send(42_i32);
15//! let v = rx.recv();  // Some(42); None means closed + empty
16//! tx.close();
17//! ```
18//!
19//! ## Internals
20//!
21//! Each channel is an `Arc<Hchan<T>>`.  The lock-protected interior holds a
22//! `VecDeque<T>` ring buffer plus two wait queues (`sendq` / `recvq`) of
23//! `Sudog` records (one per blocked goroutine).
24//!
25//! Locking uses `RawMutex` (not `std::sync::Mutex`) so that `selectgo` can
26//! hold multiple heterogeneous channel locks simultaneously without needing a
27//! typed `MutexGuard<HchanState<T>>` for each one.
28//!
29//! ### Blocking protocol
30//!
31//! When a goroutine must block it:
32//! 1. Allocates a `Sudog` from the pool.
33//! 2. Heap-allocates a `ManuallyDrop<T>` (send) or `Option<T>` (recv) as the
34//!    value staging area (`sudog.elem`).
35//! 3. Enqueues the sudog in `sendq` / `recvq` (under the channel lock).
36//! 4. Releases the lock.
37//! 5. Calls `gopark` — `park_fn` sets `GWAITING` on g0's stack.
38//!
39//! The goroutine that completes the operation:
40//! - Reads or writes through `sudog.elem`.
41//! - Sets `sudog.success` and `(*gp).param = sudog as *mut u8`.
42//! - Calls `goready`, which spins until the target is `GWAITING` before
43//!   marking it `GRUNNABLE`.
44//!
45//! ### Close semantics (matches Go)
46//!
47//! - Sending on a closed channel **panics**.
48//! - Receiving from a closed empty channel returns `None`.
49//! - Closing an already-closed channel **panics**.
50//!
51//! Ported from `hchan`, `chansend`, `chanrecv`, `closechan` in
52//! `runtime/chan.go`.
53
54use std::cell::UnsafeCell;
55use std::collections::VecDeque;
56use std::mem::ManuallyDrop;
57use std::ptr;
58use std::sync::Arc;
59
60use crate::runtime::g::{current_g, WaitReason};
61use crate::runtime::park::{gopark, goready};
62use crate::runtime::rawmutex::{LockGuard, RawMutex};
63use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog, WaitQ};
64
65// ---------------------------------------------------------------------------
66// Hchan — the heap channel object
67// ---------------------------------------------------------------------------
68
69/// Lock-protected interior of a channel.
70pub(crate) struct HchanState<T> {
71    /// Buffered elements waiting to be received (FIFO).
72    pub(crate) buf:    VecDeque<T>,
73    /// Buffer capacity (0 = unbuffered / synchronous).
74    pub(crate) cap:    usize,
75    /// True after `close()`.
76    pub(crate) closed: bool,
77    /// Goroutines blocked in `send` (buffer full or unbuffered with no receiver).
78    pub(crate) sendq:  WaitQ,
79    /// Goroutines blocked in `recv` (buffer empty or unbuffered with no sender).
80    pub(crate) recvq:  WaitQ,
81}
82
83impl<T> HchanState<T> {
84    fn new(cap: usize) -> Self {
85        Self {
86            buf:    VecDeque::with_capacity(cap),
87            cap,
88            closed: false,
89            sendq:  WaitQ::new(),
90            recvq:  WaitQ::new(),
91        }
92    }
93}
94
95/// The channel heap object, shared via `Arc` between all `Sender`/`Receiver`
96/// clones.
97///
98/// `pub(crate)` so that `selectgo` (step 14) can access the state directly.
99///
100/// The `mutex` field is first so that `Arc::as_ptr(h) as *const RawMutex` gives
101/// a stable address suitable for address-ordered lock acquisition in `selectgo`.
102pub(crate) struct Hchan<T> {
103    /// Raw adaptive spinlock protecting `state`.
104    ///
105    /// Exposed `pub(crate)` so `selectgo` can lock/unlock multiple heterogeneous
106    /// channels without needing typed `MutexGuard` storage.
107    pub(crate) mutex: RawMutex,
108    /// Interior state — always accessed under `mutex`.
109    pub(crate) state: UnsafeCell<HchanState<T>>,
110}
111
112unsafe impl<T: Send> Send for Hchan<T> {}
113unsafe impl<T: Send> Sync for Hchan<T> {}
114
115impl<T> Hchan<T> {
116    pub(crate) fn new(cap: usize) -> Self {
117        Self {
118            mutex: RawMutex::new(),
119            state: UnsafeCell::new(HchanState::new(cap)),
120        }
121    }
122
123    /// Acquire the lock and return a guard + mutable state reference.
124    ///
125    /// The guard releases the lock when dropped.  Drop it *before* calling
126    /// `gopark` so the scheduler can't see the lock still held.
127    ///
128    /// # Safety
129    /// The returned `&mut HchanState<T>` must not be used after the guard is
130    /// dropped (the lock no longer protects access).
131    #[allow(clippy::mut_from_ref)] // intentional: state is behind UnsafeCell
132    pub(crate) unsafe fn lock_state(&self) -> (LockGuard<'_>, &mut HchanState<T>) {
133        let g = LockGuard::new(&self.mutex);
134        // SAFETY: We just acquired the lock; no other thread holds a reference.
135        let s = unsafe { &mut *self.state.get() };
136        (g, s)
137    }
138}
139
140// ---------------------------------------------------------------------------
141// Public channel halves
142// ---------------------------------------------------------------------------
143
144/// The sending half of a channel.  Cheap to `clone`.
145pub struct Sender<T>(Arc<Hchan<T>>);
146
147/// The receiving half of a channel.  Cheap to `clone`.
148pub struct Receiver<T>(Arc<Hchan<T>>);
149
150impl<T> Clone for Sender<T>   { fn clone(&self) -> Self { Sender(Arc::clone(&self.0))   } }
151impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { Receiver(Arc::clone(&self.0)) } }
152
153unsafe impl<T: Send> Send for Sender<T>   {}
154unsafe impl<T: Send> Sync for Sender<T>   {}
155unsafe impl<T: Send> Send for Receiver<T> {}
156unsafe impl<T: Send> Sync for Receiver<T> {}
157
158/// Create a new channel with the given buffer capacity.
159///
160/// `cap == 0` gives an unbuffered (synchronous rendezvous) channel; `cap > 0`
161/// gives a buffered channel that holds up to `cap` values without blocking the
162/// sender.
163///
164/// Returns `(Sender<T>, Receiver<T>)`.
165pub fn chan<T: Send + 'static>(cap: usize) -> (Sender<T>, Receiver<T>) {
166    let h = Arc::new(Hchan::new(cap));
167    (Sender(Arc::clone(&h)), Receiver(h))
168}
169
170impl<T: Send + 'static> Sender<T> {
171    /// Send `val`, blocking until a receiver is ready or buffer space opens.
172    ///
173    /// # Panics
174    /// Panics if the channel has been closed.
175    pub fn send(&self, val: T) {
176        unsafe { chansend(&self.0, val, true) };
177    }
178
179    /// Non-blocking send.  Returns `false` if the buffer is full or there is
180    /// no waiting receiver.  Panics if the channel is closed.
181    pub fn try_send(&self, val: T) -> bool {
182        unsafe { chansend(&self.0, val, false) }
183    }
184
185    /// Close the channel.  Panics if already closed.
186    pub fn close(&self) {
187        unsafe { closechan(&self.0) };
188    }
189
190    /// Raw access to the underlying `Hchan` for use by `selectgo`.
191    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
192}
193
194impl<T: Send + 'static> Receiver<T> {
195    /// Receive a value, blocking until one is available or the channel closes.
196    ///
197    /// Returns `Some(val)` on success, `None` if the channel is closed and
198    /// the buffer is fully drained.
199    pub fn recv(&self) -> Option<T> {
200        unsafe { chanrecv(&self.0, true) }
201    }
202
203    /// Non-blocking receive.
204    ///
205    /// - `Some(Some(val))` — received.
206    /// - `Some(None)`      — channel closed and empty.
207    /// - `None`            — would block (nothing ready yet).
208    pub fn try_recv(&self) -> Option<Option<T>> {
209        unsafe { chanrecv_nb(&self.0) }
210    }
211
212    /// Raw access to the underlying `Hchan` for use by `selectgo`.
213    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
214}
215
216// ---------------------------------------------------------------------------
217// chansend
218// ---------------------------------------------------------------------------
219
220/// Send `val` to `c`.
221///
222/// `block = true`  → park the goroutine if the channel has no space.
223/// `block = false` → return `false` immediately if the channel has no space.
224///
225/// # Safety
226/// Must be called from a goroutine (not g0 or an OS-thread main function).
227///
228/// Ported from `chansend` in `runtime/chan.go`.
229pub(crate) unsafe fn chansend<T: Send + 'static>(
230    c:     &Arc<Hchan<T>>,
231    val:   T,
232    block: bool,
233) -> bool {
234    // SAFETY: we hold the lock for the duration of the guard's scope.
235    let (_g, state) = unsafe { c.lock_state() };
236
237    if state.closed {
238        drop(_g);
239        panic!("send on closed channel");
240    }
241
242    // ── Case 1: direct handoff to a waiting receiver ─────────────────────────
243    let recv_sg = unsafe { state.recvq.dequeue() };
244    if !recv_sg.is_null() {
245        let gp       = unsafe { (*recv_sg).g };
246        // recv_sg.elem points to Option<T> (allocated by chanrecv or selectgo).
247        let elem_ptr = unsafe { (*recv_sg).elem as *mut Option<T> };
248        if !elem_ptr.is_null() {
249            unsafe { *elem_ptr = Some(val) };
250        }
251        unsafe {
252            (*recv_sg).success = true;
253            (*gp).param        = recv_sg as *mut u8;
254        }
255        drop(_g);
256        unsafe { goready(gp) };
257        return true;
258    }
259
260    // ── Case 2: buffer has space ──────────────────────────────────────────────
261    if state.buf.len() < state.cap {
262        state.buf.push_back(val);
263        return true;
264    }
265
266    // ── Case 3: non-blocking — cannot proceed ────────────────────────────────
267    if !block {
268        return false;
269    }
270
271    // ── Case 4: block — enqueue this goroutine as a waiting sender ───────────
272    let gp = current_g();
273    debug_assert!(!gp.is_null(), "chansend: called from g0");
274
275    // Box<ManuallyDrop<T>> — the receiver moves the value out and frees the box.
276    let elem_ptr = Box::into_raw(Box::new(ManuallyDrop::new(val))) as *mut u8;
277
278    let s = acquire_sudog();
279    unsafe {
280        (*s).g          = gp;
281        (*s).elem       = elem_ptr;
282        (*s).boxed_elem = true; // Box<ManuallyDrop<T>> — must be freed by receiver
283        (*s).success    = false;
284        (*s).c          = Arc::as_ptr(c) as *mut u8;
285        (*gp).param     = ptr::null_mut();
286        state.sendq.enqueue(s);
287    }
288
289    drop(_g); // release lock BEFORE parking
290    gopark(WaitReason::ChanSend);
291
292    // ── Resumed: inspect outcome ─────────────────────────────────────────────
293    let ok = unsafe {
294        let s2 = (*gp).param as *mut Sudog;
295        (*gp).param = ptr::null_mut();
296        let ok = (*s2).success;
297
298        if !ok && !(*s2).elem.is_null() {
299            // Send was rejected (channel closed after we parked).
300            // elem points to ManuallyDrop<T> — drop the value, then free the box.
301            let ep = (*s2).elem as *mut ManuallyDrop<T>;
302            (*s2).elem = ptr::null_mut();
303            ManuallyDrop::drop(&mut *ep); // run T's destructor
304            if (*s2).boxed_elem { let _ = Box::from_raw(ep); }
305        }
306        (*s2).g = ptr::null_mut();
307        (*s2).c = ptr::null_mut();
308        release_sudog(s2);
309        ok
310    };
311
312    if !ok {
313        panic!("send on closed channel");
314    }
315    true
316}
317
318// ---------------------------------------------------------------------------
319// chanrecv
320// ---------------------------------------------------------------------------
321
322/// Receive from `c`.
323///
324/// `block = true`  → park until a value or close.
325/// `block = false` → return `None` immediately if nothing is ready.
326///
327/// Returns `Some(val)` on success or `None` for closed-and-empty / would-block.
328///
329/// # Safety
330/// Must be called from a goroutine (not g0 or an OS-thread main function).
331///
332/// Ported from `chanrecv` in `runtime/chan.go`.
333pub(crate) unsafe fn chanrecv<T: Send + 'static>(
334    c:     &Arc<Hchan<T>>,
335    block: bool,
336) -> Option<T> {
337    let (_g, state) = unsafe { c.lock_state() };
338
339    // ── Case 1: direct handoff from a waiting sender ─────────────────────────
340    let send_sg = unsafe { state.sendq.dequeue() };
341    if !send_sg.is_null() {
342        let val = recv_from_sender(state, send_sg);
343        drop(_g);
344        return Some(val);
345    }
346
347    // ── Case 2: buffer has data ───────────────────────────────────────────────
348    if !state.buf.is_empty() {
349        return Some(state.buf.pop_front().unwrap());
350    }
351
352    // ── Case 3: closed and empty ──────────────────────────────────────────────
353    if state.closed {
354        return None;
355    }
356
357    // ── Case 4: non-blocking — nothing ready ─────────────────────────────────
358    if !block {
359        return None;
360    }
361
362    // ── Case 5: block — enqueue as a waiting receiver ────────────────────────
363    let gp = current_g();
364    debug_assert!(!gp.is_null(), "chanrecv: called from g0");
365
366    // Box<Option<T>> — the sender writes Some(val) into this slot, or it
367    // stays None if the channel closes.  Freed on wakeup.
368    let elem_ptr = Box::into_raw(Box::new(None::<T>)) as *mut u8;
369
370    let s = acquire_sudog();
371    unsafe {
372        (*s).g          = gp;
373        (*s).elem       = elem_ptr;
374        (*s).boxed_elem = true; // Box<Option<T>> — must be freed on wakeup
375        (*s).success    = false;
376        (*s).c          = Arc::as_ptr(c) as *mut u8;
377        (*gp).param     = ptr::null_mut();
378        state.recvq.enqueue(s);
379    }
380
381    drop(_g);
382    gopark(WaitReason::ChanReceive);
383
384    // ── Resumed: read outcome ─────────────────────────────────────────────────
385    unsafe {
386        let s2 = (*gp).param as *mut Sudog;
387        (*gp).param = ptr::null_mut();
388        let ok = (*s2).success;
389
390        let boxed = (*s2).boxed_elem;
391        // elem points to Option<T>: Some(val) if sender delivered, None if closed.
392        let result = if ok {
393            debug_assert!(!(*s2).elem.is_null(), "chanrecv: success but elem is null");
394            let ep = (*s2).elem as *mut Option<T>;
395            (*s2).elem = ptr::null_mut();
396            // ptr::read bitwise-copies the Option<T> out of *ep.  The allocation
397            // at `ep` still holds the original bytes, but ownership of T has been
398            // transferred to `val` — running Option<T>'s destructor on *ep would
399            // double-drop T.  Cast to ManuallyDrop<Option<T>> to free the Box
400            // allocation without triggering Option<T>::drop().
401            let val = ptr::read(ep).expect("chanrecv: elem was None on success");
402            if boxed {
403                let _ = Box::from_raw(ep as *mut std::mem::ManuallyDrop<Option<T>>);
404            }
405            Some(val)
406        } else {
407            if !(*s2).elem.is_null() {
408                // Channel was closed — the slot contains None (T was never written).
409                // Dropping None does not touch any T; no ManuallyDrop needed.
410                let ep = (*s2).elem as *mut Option<T>;
411                (*s2).elem = ptr::null_mut();
412                if boxed { let _ = Box::from_raw(ep); }
413            }
414            None
415        };
416
417        (*s2).g = ptr::null_mut();
418        (*s2).c = ptr::null_mut();
419        release_sudog(s2);
420        result
421    }
422}
423
424/// Non-blocking receive.
425///
426/// Returns:
427/// - `Some(Some(v))` — value received.
428/// - `Some(None)`    — channel closed and empty.
429/// - `None`          — would block (channel has nothing ready right now).
430///
431/// # Safety
432/// May be called outside the scheduler as long as the blocking path is never
433/// triggered.
434pub(crate) unsafe fn chanrecv_nb<T: Send + 'static>(
435    c: &Arc<Hchan<T>>,
436) -> Option<Option<T>> {
437    let (_g, state) = unsafe { c.lock_state() };
438
439    let send_sg = unsafe { state.sendq.dequeue() };
440    if !send_sg.is_null() {
441        let val = recv_from_sender(state, send_sg);
442        drop(_g);
443        return Some(Some(val));
444    }
445
446    if !state.buf.is_empty() {
447        return Some(Some(state.buf.pop_front().unwrap()));
448    }
449
450    if state.closed {
451        return Some(None);
452    }
453
454    None
455}
456
457/// Receive from a **dequeued** sender sudog and wake the sender.
458///
459/// For unbuffered channels (`cap == 0`): value is moved directly from the
460/// sender's staging box.
461/// For buffered channels (always full when a sender is queued): take the head
462/// of the buffer, rotate the sender's value into the tail.
463///
464/// **Caller must release the channel lock after this returns**, before the
465/// woken goroutine can be scheduled.
466///
467/// Ported from `recv` in `runtime/chan.go`.
468fn recv_from_sender<T: Send + 'static>(
469    state:   &mut HchanState<T>,
470    send_sg: *mut Sudog,
471) -> T {
472    let gp = unsafe { (*send_sg).g };
473
474    let boxed = unsafe { (*send_sg).boxed_elem };
475
476    // send_sg.elem points to ManuallyDrop<T> (both chansend and selectgo use
477    // ManuallyDrop semantics — same memory layout as T, no destructor).
478    let val = if state.cap == 0 {
479        let ep = unsafe { (*send_sg).elem as *mut ManuallyDrop<T> };
480        let v  = unsafe { ManuallyDrop::into_inner(ptr::read(ep)) };
481        unsafe {
482            if boxed { let _ = Box::from_raw(ep); }
483            (*send_sg).elem = ptr::null_mut();
484        }
485        v
486    } else {
487        let head = state.buf.pop_front().unwrap();
488        let ep   = unsafe { (*send_sg).elem as *mut ManuallyDrop<T> };
489        let sv   = unsafe { ManuallyDrop::into_inner(ptr::read(ep)) };
490        unsafe {
491            if boxed { let _ = Box::from_raw(ep); }
492            (*send_sg).elem = ptr::null_mut();
493        }
494        state.buf.push_back(sv);
495        head
496    };
497
498    unsafe {
499        (*send_sg).success = true;
500        (*gp).param        = send_sg as *mut u8;
501    }
502    unsafe { goready(gp) };
503    val
504}
505
506// ---------------------------------------------------------------------------
507// closechan
508// ---------------------------------------------------------------------------
509
510/// Close `c`.
511///
512/// Marks the channel closed, drains all waiting receivers (they get `None`)
513/// and senders (they panic), and wakes all of them.
514///
515/// # Panics
516/// Panics if the channel is already closed.
517///
518/// # Safety
519/// Must be called from a goroutine (not g0 / OS-thread main).
520///
521/// Ported from `closechan` in `runtime/chan.go`.
522pub(crate) unsafe fn closechan<T: Send + 'static>(c: &Arc<Hchan<T>>) {
523    let (_g, state) = unsafe { c.lock_state() };
524
525    if state.closed {
526        drop(_g);
527        panic!("close of closed channel");
528    }
529    state.closed = true;
530
531    let mut wakeup: Vec<*mut crate::runtime::g::G> = Vec::new();
532
533    loop {
534        let sg = unsafe { state.recvq.dequeue() };
535        if sg.is_null() { break; }
536        let gp = unsafe { (*sg).g };
537        unsafe {
538            (*sg).success = false;
539            (*gp).param   = sg as *mut u8;
540        }
541        wakeup.push(gp);
542    }
543
544    loop {
545        let sg = unsafe { state.sendq.dequeue() };
546        if sg.is_null() { break; }
547        let gp = unsafe { (*sg).g };
548        unsafe {
549            (*sg).success = false;
550            (*gp).param   = sg as *mut u8;
551        }
552        wakeup.push(gp);
553    }
554
555    drop(_g);
556
557    for gp in wakeup {
558        unsafe { goready(gp) };
559    }
560}
561
562// ---------------------------------------------------------------------------
563// Tests
564// ---------------------------------------------------------------------------
565
566#[cfg(all(test, not(loom)))]
567mod tests {
568    use super::*;
569    use crate::runtime::sched::run_impl;
570    use std::sync::atomic::{AtomicI32, Ordering};
571    use std::sync::Arc;
572
573    // ── Buffered: fast paths (no goroutine park) ──────────────────────────────
574
575    /// Single send + recv completes without blocking.
576    #[test]
577    fn buffered_send_recv() {
578        run_impl(|| {
579            let (tx, rx) = chan::<i32>(1);
580            tx.send(42);
581            assert_eq!(rx.recv(), Some(42));
582        });
583    }
584
585    /// Values arrive in FIFO order.
586    #[test]
587    fn buffered_fifo_order() {
588        run_impl(|| {
589            let (tx, rx) = chan::<i32>(4);
590            for i in 0..4_i32 { tx.send(i); }
591            for i in 0..4_i32 { assert_eq!(rx.recv(), Some(i)); }
592        });
593    }
594
595    /// Close drains buffered values, then recv returns None.
596    #[test]
597    fn buffered_close_drains_then_none() {
598        run_impl(|| {
599            let (tx, rx) = chan::<i32>(2);
600            tx.send(1);
601            tx.send(2);
602            tx.close();
603            assert_eq!(rx.recv(), Some(1));
604            assert_eq!(rx.recv(), Some(2));
605            assert_eq!(rx.recv(), None);
606            assert_eq!(rx.recv(), None); // idempotent
607        });
608    }
609
610    // ── Non-blocking ops ──────────────────────────────────────────────────────
611
612    /// try_recv on an empty open channel returns None (would block).
613    #[test]
614    fn try_recv_empty() {
615        run_impl(|| {
616            let (_tx, rx) = chan::<i32>(4);
617            assert_eq!(rx.try_recv(), None);
618        });
619    }
620
621    /// try_recv on a closed empty channel returns Some(None).
622    #[test]
623    fn try_recv_closed_empty() {
624        run_impl(|| {
625            let (tx, rx) = chan::<i32>(4);
626            tx.close();
627            assert_eq!(rx.try_recv(), Some(None));
628        });
629    }
630
631    /// try_send to a full channel returns false.
632    #[test]
633    fn try_send_full() {
634        run_impl(|| {
635            let (tx, _rx) = chan::<i32>(2);
636            assert!(tx.try_send(1));
637            assert!(tx.try_send(2));
638            assert!(!tx.try_send(3));
639        });
640    }
641
642    // ── Panic paths ───────────────────────────────────────────────────────────
643    //
644    // These don't exercise goroutine parking — the panic must unwind back to
645    // the test thread's #[should_panic] handler, so we must NOT wrap in run_impl.
646
647    /// Closing an already-closed channel panics.
648    #[test]
649    #[should_panic(expected = "close of closed channel")]
650    fn close_twice_panics() {
651        let (tx, _rx) = chan::<i32>(1);
652        tx.close();
653        tx.close();
654    }
655
656    /// Sending on a closed channel panics.
657    #[test]
658    #[should_panic(expected = "send on closed channel")]
659    fn send_on_closed_panics() {
660        let (tx, _rx) = chan::<i32>(1);
661        tx.close();
662        tx.send(1);
663    }
664
665    // ── Goroutine rendezvous (exercises park/unpark) ──────────────────────────
666
667    /// Unbuffered send and recv across two goroutines.
668    #[test]
669    fn unbuffered_rendezvous() {
670        use crate::runtime::sched::spawn_goroutine;
671
672        run_impl(|| {
673            let (tx, rx) = chan::<i32>(0);
674            spawn_goroutine(move || { tx.send(99); });
675            assert_eq!(rx.recv(), Some(99));
676        });
677    }
678
679    /// Ping-pong ten rounds across two goroutines.
680    #[test]
681    fn unbuffered_ping_pong() {
682        use crate::runtime::sched::spawn_goroutine;
683
684        run_impl(|| {
685            let (ping_tx, ping_rx) = chan::<i32>(0);
686            let (pong_tx, pong_rx) = chan::<i32>(0);
687
688            spawn_goroutine(move || {
689                for _ in 0..10 {
690                    let v = ping_rx.recv().unwrap();
691                    pong_tx.send(v + 1);
692                }
693            });
694
695            let mut n = 0_i32;
696            for _ in 0..10 {
697                ping_tx.send(n);
698                n = pong_rx.recv().unwrap();
699            }
700            assert_eq!(n, 10);
701        });
702    }
703
704    /// Buffered producer/consumer: 20 values summed by a goroutine.
705    #[test]
706    fn producer_consumer() {
707        use crate::runtime::sched::spawn_goroutine;
708
709        const N: i32 = 20;
710        let sum = Arc::new(AtomicI32::new(0));
711        let sum2 = Arc::clone(&sum);
712
713        run_impl(move || {
714            let (tx, rx) = chan::<i32>(4);
715            let sum3 = Arc::clone(&sum2);
716
717            spawn_goroutine(move || {
718                for i in 0..N { tx.send(i); }
719                tx.close();
720            });
721
722            spawn_goroutine(move || {
723                while let Some(v) = rx.recv() {
724                    sum3.fetch_add(v, Ordering::Relaxed);
725                }
726            });
727
728            // A wall-clock deadline is robust across CI runner speeds and
729            // build profiles (debug, coverage/nightly) where frame sizes and
730            // instrumentation overhead can make goroutines run much slower.
731            let expected = N * (N - 1) / 2;
732            let deadline =
733                std::time::Instant::now() + std::time::Duration::from_secs(5);
734            while sum2.load(Ordering::Acquire) != expected
735                && std::time::Instant::now() < deadline
736            {
737                crate::gosched();
738            }
739        });
740
741        assert_eq!(sum.load(Ordering::Acquire), N * (N - 1) / 2);
742    }
743    
744    /// multi-producer, single consumer: 10 producers summing 10 values
745    #[test]
746    fn multi_producer_single_consumer() {
747        use crate::runtime::sched::spawn_goroutine;
748        use crate::sync::WaitGroup;
749        
750        const N: i32 = 10;
751        let total = N*N;
752        let sum = Arc::new(AtomicI32::new(0));
753        let sum2 = Arc::clone(&sum);
754
755        run_impl(move || {
756            let (tx, rx) = chan::<i32>(4);
757            let sum3 = Arc::clone(&sum2);
758            let wg  = Arc::new(WaitGroup::new());
759
760            for g in 0 .. N {
761                let start = N*g;
762                let end = start+N;
763                let tx_clone = tx.clone();
764                let wg_clone = Arc::clone(&wg);
765                // add(1) must happen before spawn_goroutine so that wg.wait()
766                // cannot return before all producers have registered themselves.
767                wg_clone.add(1);
768                spawn_goroutine(move || {
769                    for i in start..end { tx_clone.send(i); }
770                    wg_clone.done()
771                });
772            }
773            
774            spawn_goroutine(move || {
775                while let Some(v) = rx.recv() {
776                    sum3.fetch_add(v, Ordering::Relaxed);
777                }
778            });
779
780            wg.wait();
781            tx.close();
782            
783            // A wall-clock deadline is robust across CI runner speeds and
784            // build profiles (debug, coverage/nightly) where frame sizes and
785            // instrumentation overhead can make goroutines run much slower.
786            let expected = total * (total - 1) / 2;
787            let deadline =
788                std::time::Instant::now() + std::time::Duration::from_secs(5);
789            while sum2.load(Ordering::Acquire) != expected
790                && std::time::Instant::now() < deadline
791            {
792                crate::gosched();
793            }
794        });
795
796        assert_eq!(sum.load(Ordering::Acquire), total * (total - 1) / 2);
797
798    }
799
800    /// Close wakes a goroutine that is blocked on recv.
801    #[test]
802    fn close_wakes_blocked_receiver() {
803        use crate::runtime::sched::spawn_goroutine;
804
805        let got_none = Arc::new(AtomicI32::new(0));
806        let got2 = Arc::clone(&got_none);
807        let got3 = Arc::clone(&got_none); // checked inside run_impl to bound the post-close loop
808
809        run_impl(move || {
810            let (tx, rx) = chan::<i32>(0);
811
812            spawn_goroutine(move || {
813                // Block on recv until the channel is closed.
814                if rx.recv().is_none() {
815                    got2.fetch_add(1, Ordering::Relaxed);
816                }
817            });
818
819            // Give the spawned goroutine time to start and block on recv.
820            // More iterations than before because parallel test load on CI can
821            // starve goroutines for many scheduler rounds.
822            for _ in 0..500 { crate::gosched(); }
823            tx.close();
824            // Wait for the goroutine to observe the close and record its result.
825            // A wall-clock deadline is robust across CI runner speeds.
826            let deadline =
827                std::time::Instant::now() + std::time::Duration::from_secs(5);
828            while got3.load(Ordering::Acquire) == 0
829                && std::time::Instant::now() < deadline
830            {
831                crate::gosched();
832            }
833        });
834
835        assert_eq!(got_none.load(Ordering::Acquire), 1);
836    }
837}