Skip to main content

go_lib/
chan.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Channels — ported from `src/runtime/chan.go`.
3//!
4//! Buffered and unbuffered channels backed by the same G/M/P scheduler used by
5//! goroutines.  A goroutine that blocks on a channel send or receive is parked
6//! via `gopark` and resumed via `goready`; no OS thread is ever blocked.
7//!
8//! ## Public surface
9//!
10//! ```no_run
11//! let (tx, rx) = go_lib::chan::chan::<i32>(0);   // unbuffered
12//! let (tx, rx) = go_lib::chan::chan::<i32>(16);  // buffered, capacity 16
13//!
14//! tx.send(42_i32);
15//! let v = rx.recv();  // Some(42); None means closed + empty
16//! tx.close();
17//! ```
18//!
19//! ## Internals
20//!
21//! Each channel is an `Arc<Hchan<T>>`.  The lock-protected interior holds a
22//! `VecDeque<T>` ring buffer plus two wait queues (`sendq` / `recvq`) of
23//! `Sudog` records (one per blocked goroutine).
24//!
25//! Locking uses `RawMutex` (not `std::sync::Mutex`) so that `selectgo` can
26//! hold multiple heterogeneous channel locks simultaneously without needing a
27//! typed `MutexGuard<HchanState<T>>` for each one.
28//!
29//! ### Blocking protocol
30//!
31//! When a goroutine must block it:
32//! 1. Allocates a `Sudog` from the pool.
33//! 2. Heap-allocates a `MaybeUninit<T>` as the value staging area (`sudog.elem`).
34//! 3. Enqueues the sudog in `sendq` / `recvq` (under the channel lock).
35//! 4. Releases the lock.
36//! 5. Calls `gopark` — `park_fn` sets `GWAITING` on g0's stack.
37//!
38//! The goroutine that completes the operation:
39//! - Reads or writes through `sudog.elem`.
40//! - Sets `sudog.success` and `(*gp).param = sudog as *mut u8`.
41//! - Calls `goready`, which spins until the target is `GWAITING` before
42//!   marking it `GRUNNABLE`.
43//!
44//! ### Close semantics (matches Go)
45//!
46//! - Sending on a closed channel **panics**.
47//! - Receiving from a closed empty channel returns `None`.
48//! - Closing an already-closed channel **panics**.
49//!
50//! Ported from `hchan`, `chansend`, `chanrecv`, `closechan` in
51//! `runtime/chan.go`.
52
53use std::cell::UnsafeCell;
54use std::collections::VecDeque;
55use std::mem::MaybeUninit;
56use std::ptr;
57use std::sync::Arc;
58
59use crate::runtime::g::{current_g, WaitReason};
60use crate::runtime::park::{gopark, goready};
61use crate::runtime::rawmutex::{LockGuard, RawMutex};
62use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog, WaitQ};
63
64// ---------------------------------------------------------------------------
65// Hchan — the heap channel object
66// ---------------------------------------------------------------------------
67
68/// Lock-protected interior of a channel.
69pub(crate) struct HchanState<T> {
70    /// Buffered elements waiting to be received (FIFO).
71    pub(crate) buf:    VecDeque<T>,
72    /// Buffer capacity (0 = unbuffered / synchronous).
73    pub(crate) cap:    usize,
74    /// True after `close()`.
75    pub(crate) closed: bool,
76    /// Goroutines blocked in `send` (buffer full or unbuffered with no receiver).
77    pub(crate) sendq:  WaitQ,
78    /// Goroutines blocked in `recv` (buffer empty or unbuffered with no sender).
79    pub(crate) recvq:  WaitQ,
80}
81
82impl<T> HchanState<T> {
83    fn new(cap: usize) -> Self {
84        Self {
85            buf:    VecDeque::with_capacity(cap),
86            cap,
87            closed: false,
88            sendq:  WaitQ::new(),
89            recvq:  WaitQ::new(),
90        }
91    }
92}
93
94/// The channel heap object, shared via `Arc` between all `Sender`/`Receiver`
95/// clones.
96///
97/// `pub(crate)` so that `selectgo` (step 14) can access the state directly.
98///
99/// The `mutex` field is first so that `Arc::as_ptr(h) as *const RawMutex` gives
100/// a stable address suitable for address-ordered lock acquisition in `selectgo`.
101pub(crate) struct Hchan<T> {
102    /// Raw adaptive spinlock protecting `state`.
103    ///
104    /// Exposed `pub(crate)` so `selectgo` can lock/unlock multiple heterogeneous
105    /// channels without needing typed `MutexGuard` storage.
106    pub(crate) mutex: RawMutex,
107    /// Interior state — always accessed under `mutex`.
108    pub(crate) state: UnsafeCell<HchanState<T>>,
109}
110
111unsafe impl<T: Send> Send for Hchan<T> {}
112unsafe impl<T: Send> Sync for Hchan<T> {}
113
114impl<T> Hchan<T> {
115    pub(crate) fn new(cap: usize) -> Self {
116        Self {
117            mutex: RawMutex::new(),
118            state: UnsafeCell::new(HchanState::new(cap)),
119        }
120    }
121
122    /// Acquire the lock and return a guard + mutable state reference.
123    ///
124    /// The guard releases the lock when dropped.  Drop it *before* calling
125    /// `gopark` so the scheduler can't see the lock still held.
126    ///
127    /// # Safety
128    /// The returned `&mut HchanState<T>` must not be used after the guard is
129    /// dropped (the lock no longer protects access).
130    #[allow(clippy::mut_from_ref)] // intentional: state is behind UnsafeCell
131    pub(crate) unsafe fn lock_state(&self) -> (LockGuard<'_>, &mut HchanState<T>) {
132        let g = LockGuard::new(&self.mutex);
133        // SAFETY: We just acquired the lock; no other thread holds a reference.
134        let s = unsafe { &mut *self.state.get() };
135        (g, s)
136    }
137}
138
139// ---------------------------------------------------------------------------
140// Public channel halves
141// ---------------------------------------------------------------------------
142
143/// The sending half of a channel.  Cheap to `clone`.
144pub struct Sender<T>(Arc<Hchan<T>>);
145
146/// The receiving half of a channel.  Cheap to `clone`.
147pub struct Receiver<T>(Arc<Hchan<T>>);
148
149impl<T> Clone for Sender<T>   { fn clone(&self) -> Self { Sender(Arc::clone(&self.0))   } }
150impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { Receiver(Arc::clone(&self.0)) } }
151
152unsafe impl<T: Send> Send for Sender<T>   {}
153unsafe impl<T: Send> Sync for Sender<T>   {}
154unsafe impl<T: Send> Send for Receiver<T> {}
155unsafe impl<T: Send> Sync for Receiver<T> {}
156
157/// Create a new channel with the given buffer capacity.
158///
159/// `cap == 0` gives an unbuffered (synchronous rendezvous) channel; `cap > 0`
160/// gives a buffered channel that holds up to `cap` values without blocking the
161/// sender.
162///
163/// Returns `(Sender<T>, Receiver<T>)`.
164pub fn chan<T: Send + 'static>(cap: usize) -> (Sender<T>, Receiver<T>) {
165    let h = Arc::new(Hchan::new(cap));
166    (Sender(Arc::clone(&h)), Receiver(h))
167}
168
169impl<T: Send + 'static> Sender<T> {
170    /// Send `val`, blocking until a receiver is ready or buffer space opens.
171    ///
172    /// # Panics
173    /// Panics if the channel has been closed.
174    pub fn send(&self, val: T) {
175        unsafe { chansend(&self.0, val, true) };
176    }
177
178    /// Non-blocking send.  Returns `false` if the buffer is full or there is
179    /// no waiting receiver.  Panics if the channel is closed.
180    pub fn try_send(&self, val: T) -> bool {
181        unsafe { chansend(&self.0, val, false) }
182    }
183
184    /// Close the channel.  Panics if already closed.
185    pub fn close(&self) {
186        unsafe { closechan(&self.0) };
187    }
188
189    /// Raw access to the underlying `Hchan` for use by `selectgo`.
190    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
191}
192
193impl<T: Send + 'static> Receiver<T> {
194    /// Receive a value, blocking until one is available or the channel closes.
195    ///
196    /// Returns `Some(val)` on success, `None` if the channel is closed and
197    /// the buffer is fully drained.
198    pub fn recv(&self) -> Option<T> {
199        unsafe { chanrecv(&self.0, true) }
200    }
201
202    /// Non-blocking receive.
203    ///
204    /// - `Some(Some(val))` — received.
205    /// - `Some(None)`      — channel closed and empty.
206    /// - `None`            — would block (nothing ready yet).
207    pub fn try_recv(&self) -> Option<Option<T>> {
208        unsafe { chanrecv_nb(&self.0) }
209    }
210
211    /// Raw access to the underlying `Hchan` for use by `selectgo`.
212    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
213}
214
215// ---------------------------------------------------------------------------
216// chansend
217// ---------------------------------------------------------------------------
218
219/// Send `val` to `c`.
220///
221/// `block = true`  → park the goroutine if the channel has no space.
222/// `block = false` → return `false` immediately if the channel has no space.
223///
224/// # Safety
225/// Must be called from a goroutine (not g0 or an OS-thread main function).
226///
227/// Ported from `chansend` in `runtime/chan.go`.
228pub(crate) unsafe fn chansend<T: Send + 'static>(
229    c:     &Arc<Hchan<T>>,
230    val:   T,
231    block: bool,
232) -> bool {
233    // SAFETY: we hold the lock for the duration of the guard's scope.
234    let (_g, state) = unsafe { c.lock_state() };
235
236    if state.closed {
237        drop(_g);
238        panic!("send on closed channel");
239    }
240
241    // ── Case 1: direct handoff to a waiting receiver ─────────────────────────
242    let recv_sg = unsafe { state.recvq.dequeue() };
243    if !recv_sg.is_null() {
244        let gp       = unsafe { (*recv_sg).g };
245        let elem_ptr = unsafe { (*recv_sg).elem as *mut MaybeUninit<T> };
246        if !elem_ptr.is_null() {
247            unsafe { (*elem_ptr).write(val) };
248        }
249        unsafe {
250            (*recv_sg).success = true;
251            (*gp).param        = recv_sg as *mut u8;
252        }
253        drop(_g);
254        unsafe { goready(gp) };
255        return true;
256    }
257
258    // ── Case 2: buffer has space ──────────────────────────────────────────────
259    if state.buf.len() < state.cap {
260        state.buf.push_back(val);
261        return true;
262    }
263
264    // ── Case 3: non-blocking — cannot proceed ────────────────────────────────
265    if !block {
266        return false;
267    }
268
269    // ── Case 4: block — enqueue this goroutine as a waiting sender ───────────
270    let gp = current_g();
271    debug_assert!(!gp.is_null(), "chansend: called from g0");
272
273    let elem_ptr = Box::into_raw(Box::new(MaybeUninit::new(val))) as *mut u8;
274
275    let s = acquire_sudog();
276    unsafe {
277        (*s).g          = gp;
278        (*s).elem       = elem_ptr;
279        (*s).boxed_elem = true; // Box<MaybeUninit<T>> — must be freed by receiver
280        (*s).success    = false;
281        (*s).c          = Arc::as_ptr(c) as *mut u8;
282        (*gp).param     = ptr::null_mut();
283        state.sendq.enqueue(s);
284    }
285
286    drop(_g); // release lock BEFORE parking
287    unsafe { gopark(WaitReason::ChanSend) };
288
289    // ── Resumed: inspect outcome ─────────────────────────────────────────────
290    let ok = unsafe {
291        let s2 = (*gp).param as *mut Sudog;
292        (*gp).param = ptr::null_mut();
293        let ok = (*s2).success;
294
295        if !ok && !(*s2).elem.is_null() {
296            let ep = (*s2).elem as *mut MaybeUninit<T>;
297            (*s2).elem = ptr::null_mut();
298            (*ep).assume_init_drop();
299            if (*s2).boxed_elem { let _ = Box::from_raw(ep); }
300        }
301        (*s2).g = ptr::null_mut();
302        (*s2).c = ptr::null_mut();
303        release_sudog(s2);
304        ok
305    };
306
307    if !ok {
308        panic!("send on closed channel");
309    }
310    true
311}
312
313// ---------------------------------------------------------------------------
314// chanrecv
315// ---------------------------------------------------------------------------
316
317/// Receive from `c`.
318///
319/// `block = true`  → park until a value or close.
320/// `block = false` → return `None` immediately if nothing is ready.
321///
322/// Returns `Some(val)` on success or `None` for closed-and-empty / would-block.
323///
324/// # Safety
325/// Must be called from a goroutine (not g0 or an OS-thread main function).
326///
327/// Ported from `chanrecv` in `runtime/chan.go`.
328pub(crate) unsafe fn chanrecv<T: Send + 'static>(
329    c:     &Arc<Hchan<T>>,
330    block: bool,
331) -> Option<T> {
332    let (_g, state) = unsafe { c.lock_state() };
333
334    // ── Case 1: direct handoff from a waiting sender ─────────────────────────
335    let send_sg = unsafe { state.sendq.dequeue() };
336    if !send_sg.is_null() {
337        let val = recv_from_sender(state, send_sg);
338        drop(_g);
339        return Some(val);
340    }
341
342    // ── Case 2: buffer has data ───────────────────────────────────────────────
343    if !state.buf.is_empty() {
344        return Some(state.buf.pop_front().unwrap());
345    }
346
347    // ── Case 3: closed and empty ──────────────────────────────────────────────
348    if state.closed {
349        return None;
350    }
351
352    // ── Case 4: non-blocking — nothing ready ─────────────────────────────────
353    if !block {
354        return None;
355    }
356
357    // ── Case 5: block — enqueue as a waiting receiver ────────────────────────
358    let gp = current_g();
359    debug_assert!(!gp.is_null(), "chanrecv: called from g0");
360
361    let elem_ptr = Box::into_raw(Box::new(MaybeUninit::<T>::uninit())) as *mut u8;
362
363    let s = acquire_sudog();
364    unsafe {
365        (*s).g          = gp;
366        (*s).elem       = elem_ptr;
367        (*s).boxed_elem = true; // Box<MaybeUninit<T>> — must be freed on wakeup
368        (*s).success    = false;
369        (*s).c          = Arc::as_ptr(c) as *mut u8;
370        (*gp).param     = ptr::null_mut();
371        state.recvq.enqueue(s);
372    }
373
374    drop(_g);
375    unsafe { gopark(WaitReason::ChanReceive) };
376
377    // ── Resumed: read outcome ─────────────────────────────────────────────────
378    unsafe {
379        let s2 = (*gp).param as *mut Sudog;
380        (*gp).param = ptr::null_mut();
381        let ok = (*s2).success;
382
383        let boxed = (*s2).boxed_elem;
384        let result = if ok {
385            debug_assert!(!(*s2).elem.is_null(), "chanrecv: success but elem is null");
386            let ep = (*s2).elem as *mut MaybeUninit<T>;
387            (*s2).elem = ptr::null_mut();
388            let val = (*ep).assume_init_read();
389            if boxed { let _ = Box::from_raw(ep); }
390            Some(val)
391        } else {
392            if !(*s2).elem.is_null() {
393                let ep = (*s2).elem as *mut MaybeUninit<T>;
394                (*s2).elem = ptr::null_mut();
395                if boxed { let _ = Box::from_raw(ep); } // uninitialised — don't assume_init
396            }
397            None
398        };
399
400        (*s2).g = ptr::null_mut();
401        (*s2).c = ptr::null_mut();
402        release_sudog(s2);
403        result
404    }
405}
406
407/// Non-blocking receive.
408///
409/// Returns:
410/// - `Some(Some(v))` — value received.
411/// - `Some(None)`    — channel closed and empty.
412/// - `None`          — would block (channel has nothing ready right now).
413///
414/// # Safety
415/// May be called outside the scheduler as long as the blocking path is never
416/// triggered.
417pub(crate) unsafe fn chanrecv_nb<T: Send + 'static>(
418    c: &Arc<Hchan<T>>,
419) -> Option<Option<T>> {
420    let (_g, state) = unsafe { c.lock_state() };
421
422    let send_sg = unsafe { state.sendq.dequeue() };
423    if !send_sg.is_null() {
424        let val = recv_from_sender(state, send_sg);
425        drop(_g);
426        return Some(Some(val));
427    }
428
429    if !state.buf.is_empty() {
430        return Some(Some(state.buf.pop_front().unwrap()));
431    }
432
433    if state.closed {
434        return Some(None);
435    }
436
437    None
438}
439
440/// Receive from a **dequeued** sender sudog and wake the sender.
441///
442/// For unbuffered channels (`cap == 0`): value is moved directly from the
443/// sender's staging box.
444/// For buffered channels (always full when a sender is queued): take the head
445/// of the buffer, rotate the sender's value into the tail.
446///
447/// **Caller must release the channel lock after this returns**, before the
448/// woken goroutine can be scheduled.
449///
450/// Ported from `recv` in `runtime/chan.go`.
451fn recv_from_sender<T: Send + 'static>(
452    state:   &mut HchanState<T>,
453    send_sg: *mut Sudog,
454) -> T {
455    let gp = unsafe { (*send_sg).g };
456
457    let boxed = unsafe { (*send_sg).boxed_elem };
458
459    let val = if state.cap == 0 {
460        let ep = unsafe { (*send_sg).elem as *mut MaybeUninit<T> };
461        let v  = unsafe { (*ep).assume_init_read() };
462        unsafe {
463            if boxed { let _ = Box::from_raw(ep); }
464            (*send_sg).elem = ptr::null_mut();
465        }
466        v
467    } else {
468        let head = state.buf.pop_front().unwrap();
469        let ep   = unsafe { (*send_sg).elem as *mut MaybeUninit<T> };
470        let sv   = unsafe { (*ep).assume_init_read() };
471        unsafe {
472            if boxed { let _ = Box::from_raw(ep); }
473            (*send_sg).elem = ptr::null_mut();
474        }
475        state.buf.push_back(sv);
476        head
477    };
478
479    unsafe {
480        (*send_sg).success = true;
481        (*gp).param        = send_sg as *mut u8;
482    }
483    unsafe { goready(gp) };
484    val
485}
486
487// ---------------------------------------------------------------------------
488// closechan
489// ---------------------------------------------------------------------------
490
491/// Close `c`.
492///
493/// Marks the channel closed, drains all waiting receivers (they get `None`)
494/// and senders (they panic), and wakes all of them.
495///
496/// # Panics
497/// Panics if the channel is already closed.
498///
499/// # Safety
500/// Must be called from a goroutine (not g0 / OS-thread main).
501///
502/// Ported from `closechan` in `runtime/chan.go`.
503pub(crate) unsafe fn closechan<T: Send + 'static>(c: &Arc<Hchan<T>>) {
504    let (_g, state) = unsafe { c.lock_state() };
505
506    if state.closed {
507        drop(_g);
508        panic!("close of closed channel");
509    }
510    state.closed = true;
511
512    let mut wakeup: Vec<*mut crate::runtime::g::G> = Vec::new();
513
514    loop {
515        let sg = unsafe { state.recvq.dequeue() };
516        if sg.is_null() { break; }
517        let gp = unsafe { (*sg).g };
518        unsafe {
519            (*sg).success = false;
520            (*gp).param   = sg as *mut u8;
521        }
522        wakeup.push(gp);
523    }
524
525    loop {
526        let sg = unsafe { state.sendq.dequeue() };
527        if sg.is_null() { break; }
528        let gp = unsafe { (*sg).g };
529        unsafe {
530            (*sg).success = false;
531            (*gp).param   = sg as *mut u8;
532        }
533        wakeup.push(gp);
534    }
535
536    drop(_g);
537
538    for gp in wakeup {
539        unsafe { goready(gp) };
540    }
541}
542
543// ---------------------------------------------------------------------------
544// Tests
545// ---------------------------------------------------------------------------
546
547#[cfg(all(test, not(loom)))]
548mod tests {
549    use super::*;
550    use crate::runtime::sched::run_impl;
551    use std::sync::atomic::{AtomicI32, Ordering};
552    use std::sync::Arc;
553
554    // ── Buffered: fast paths (no goroutine park) ──────────────────────────────
555
556    /// Single send + recv completes without blocking.
557    #[test]
558    fn buffered_send_recv() {
559        run_impl(|| {
560            let (tx, rx) = chan::<i32>(1);
561            tx.send(42);
562            assert_eq!(rx.recv(), Some(42));
563        });
564    }
565
566    /// Values arrive in FIFO order.
567    #[test]
568    fn buffered_fifo_order() {
569        run_impl(|| {
570            let (tx, rx) = chan::<i32>(4);
571            for i in 0..4_i32 { tx.send(i); }
572            for i in 0..4_i32 { assert_eq!(rx.recv(), Some(i)); }
573        });
574    }
575
576    /// Close drains buffered values, then recv returns None.
577    #[test]
578    fn buffered_close_drains_then_none() {
579        run_impl(|| {
580            let (tx, rx) = chan::<i32>(2);
581            tx.send(1);
582            tx.send(2);
583            tx.close();
584            assert_eq!(rx.recv(), Some(1));
585            assert_eq!(rx.recv(), Some(2));
586            assert_eq!(rx.recv(), None);
587            assert_eq!(rx.recv(), None); // idempotent
588        });
589    }
590
591    // ── Non-blocking ops ──────────────────────────────────────────────────────
592
593    /// try_recv on an empty open channel returns None (would block).
594    #[test]
595    fn try_recv_empty() {
596        run_impl(|| {
597            let (_tx, rx) = chan::<i32>(4);
598            assert_eq!(rx.try_recv(), None);
599        });
600    }
601
602    /// try_recv on a closed empty channel returns Some(None).
603    #[test]
604    fn try_recv_closed_empty() {
605        run_impl(|| {
606            let (tx, rx) = chan::<i32>(4);
607            tx.close();
608            assert_eq!(rx.try_recv(), Some(None));
609        });
610    }
611
612    /// try_send to a full channel returns false.
613    #[test]
614    fn try_send_full() {
615        run_impl(|| {
616            let (tx, _rx) = chan::<i32>(2);
617            assert!(tx.try_send(1));
618            assert!(tx.try_send(2));
619            assert!(!tx.try_send(3));
620        });
621    }
622
623    // ── Panic paths ───────────────────────────────────────────────────────────
624    //
625    // These don't exercise goroutine parking — the panic must unwind back to
626    // the test thread's #[should_panic] handler, so we must NOT wrap in run_impl.
627
628    /// Closing an already-closed channel panics.
629    #[test]
630    #[should_panic(expected = "close of closed channel")]
631    fn close_twice_panics() {
632        let (tx, _rx) = chan::<i32>(1);
633        tx.close();
634        tx.close();
635    }
636
637    /// Sending on a closed channel panics.
638    #[test]
639    #[should_panic(expected = "send on closed channel")]
640    fn send_on_closed_panics() {
641        let (tx, _rx) = chan::<i32>(1);
642        tx.close();
643        tx.send(1);
644    }
645
646    // ── Goroutine rendezvous (exercises park/unpark) ──────────────────────────
647
648    /// Unbuffered send and recv across two goroutines.
649    #[test]
650    fn unbuffered_rendezvous() {
651        use crate::runtime::sched::spawn_goroutine;
652
653        run_impl(|| {
654            let (tx, rx) = chan::<i32>(0);
655            unsafe {
656                spawn_goroutine(move || { tx.send(99); });
657            }
658            assert_eq!(rx.recv(), Some(99));
659        });
660    }
661
662    /// Ping-pong ten rounds across two goroutines.
663    #[test]
664    fn unbuffered_ping_pong() {
665        use crate::runtime::sched::spawn_goroutine;
666
667        run_impl(|| {
668            let (ping_tx, ping_rx) = chan::<i32>(0);
669            let (pong_tx, pong_rx) = chan::<i32>(0);
670
671            unsafe {
672                spawn_goroutine(move || {
673                    for _ in 0..10 {
674                        let v = ping_rx.recv().unwrap();
675                        pong_tx.send(v + 1);
676                    }
677                });
678            }
679
680            let mut n = 0_i32;
681            for _ in 0..10 {
682                ping_tx.send(n);
683                n = pong_rx.recv().unwrap();
684            }
685            assert_eq!(n, 10);
686        });
687    }
688
689    /// Buffered producer/consumer: 20 values summed by a goroutine.
690    #[test]
691    fn producer_consumer() {
692        use crate::runtime::sched::spawn_goroutine;
693
694        const N: i32 = 20;
695        let sum = Arc::new(AtomicI32::new(0));
696        let sum2 = Arc::clone(&sum);
697
698        run_impl(move || {
699            let (tx, rx) = chan::<i32>(4);
700            let sum3 = Arc::clone(&sum2);
701
702            unsafe {
703                spawn_goroutine(move || {
704                    for i in 0..N { tx.send(i); }
705                    tx.close();
706                });
707            }
708
709            unsafe {
710                spawn_goroutine(move || {
711                    while let Some(v) = rx.recv() {
712                        sum3.fetch_add(v, Ordering::Relaxed);
713                    }
714                });
715            }
716
717            for _ in 0..500 { crate::gosched(); }
718        });
719
720        assert_eq!(sum.load(Ordering::Acquire), N * (N - 1) / 2);
721    }
722
723    /// Close wakes a goroutine that is blocked on recv.
724    #[test]
725    fn close_wakes_blocked_receiver() {
726        use crate::runtime::sched::spawn_goroutine;
727
728        let got_none = Arc::new(AtomicI32::new(0));
729        let got2 = Arc::clone(&got_none);
730
731        run_impl(move || {
732            let (tx, rx) = chan::<i32>(0);
733
734            unsafe {
735                spawn_goroutine(move || {
736                    // Block on recv until the channel is closed.
737                    if rx.recv().is_none() {
738                        got2.fetch_add(1, Ordering::Relaxed);
739                    }
740                });
741            }
742
743            for _ in 0..20 { crate::gosched(); }
744            tx.close();
745            for _ in 0..20 { crate::gosched(); }
746        });
747
748        assert_eq!(got_none.load(Ordering::Acquire), 1);
749    }
750}