go_lib/chan.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Channels — ported from `src/runtime/chan.go`.
3//!
4//! Buffered and unbuffered channels backed by the same G/M/P scheduler used by
5//! goroutines. A goroutine that blocks on a channel send or receive is parked
6//! via `gopark` and resumed via `goready`; no OS thread is ever blocked.
7//!
8//! ## Public surface
9//!
10//! ```no_run
11//! let (tx, rx) = go_lib::chan::chan::<i32>(0); // unbuffered
12//! let (tx, rx) = go_lib::chan::chan::<i32>(16); // buffered, capacity 16
13//!
14//! tx.send(42_i32);
15//! let v = rx.recv(); // Some(42); None means closed + empty
16//! tx.close();
17//! ```
18//!
19//! ## Internals
20//!
21//! Each channel is an `Arc<Hchan<T>>`. The lock-protected interior holds a
22//! `VecDeque<T>` ring buffer plus two wait queues (`sendq` / `recvq`) of
23//! `Sudog` records (one per blocked goroutine).
24//!
25//! Locking uses `RawMutex` (not `std::sync::Mutex`) so that `selectgo` can
26//! hold multiple heterogeneous channel locks simultaneously without needing a
27//! typed `MutexGuard<HchanState<T>>` for each one.
28//!
29//! ### Blocking protocol
30//!
31//! When a goroutine must block it:
32//! 1. Allocates a `Sudog` from the pool.
33//! 2. Heap-allocates a `ManuallyDrop<T>` (send) or `Option<T>` (recv) as the
34//! value staging area (`sudog.elem`).
35//! 3. Enqueues the sudog in `sendq` / `recvq` (under the channel lock).
36//! 4. Releases the lock.
37//! 5. Calls `gopark` — `park_fn` sets `GWAITING` on g0's stack.
38//!
39//! The goroutine that completes the operation:
40//! - Reads or writes through `sudog.elem`.
41//! - Sets `sudog.success` and `(*gp).param = sudog as *mut u8`.
42//! - Calls `goready`, which spins until the target is `GWAITING` before
43//! marking it `GRUNNABLE`.
44//!
45//! ### Close semantics (matches Go)
46//!
47//! - Sending on a closed channel **panics**.
48//! - Receiving from a closed empty channel returns `None`.
49//! - Closing an already-closed channel **panics**.
50//!
51//! Ported from `hchan`, `chansend`, `chanrecv`, `closechan` in
52//! `runtime/chan.go`.
53
54use std::cell::UnsafeCell;
55use std::collections::VecDeque;
56use std::mem::ManuallyDrop;
57use std::ptr;
58use std::sync::Arc;
59
60use crate::runtime::g::{current_g, WaitReason};
61use crate::runtime::park::{gopark, goready};
62use crate::runtime::rawmutex::{LockGuard, RawMutex};
63use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog, WaitQ};
64
65// ---------------------------------------------------------------------------
66// Hchan — the heap channel object
67// ---------------------------------------------------------------------------
68
69/// Lock-protected interior of a channel.
70pub(crate) struct HchanState<T> {
71 /// Buffered elements waiting to be received (FIFO).
72 pub(crate) buf: VecDeque<T>,
73 /// Buffer capacity (0 = unbuffered / synchronous).
74 pub(crate) cap: usize,
75 /// True after `close()`.
76 pub(crate) closed: bool,
77 /// Goroutines blocked in `send` (buffer full or unbuffered with no receiver).
78 pub(crate) sendq: WaitQ,
79 /// Goroutines blocked in `recv` (buffer empty or unbuffered with no sender).
80 pub(crate) recvq: WaitQ,
81}
82
83impl<T> HchanState<T> {
84 fn new(cap: usize) -> Self {
85 Self {
86 buf: VecDeque::with_capacity(cap),
87 cap,
88 closed: false,
89 sendq: WaitQ::new(),
90 recvq: WaitQ::new(),
91 }
92 }
93}
94
95/// The channel heap object, shared via `Arc` between all `Sender`/`Receiver`
96/// clones.
97///
98/// `pub(crate)` so that `selectgo` (step 14) can access the state directly.
99///
100/// The `mutex` field is first so that `Arc::as_ptr(h) as *const RawMutex` gives
101/// a stable address suitable for address-ordered lock acquisition in `selectgo`.
102pub(crate) struct Hchan<T> {
103 /// Raw adaptive spinlock protecting `state`.
104 ///
105 /// Exposed `pub(crate)` so `selectgo` can lock/unlock multiple heterogeneous
106 /// channels without needing typed `MutexGuard` storage.
107 pub(crate) mutex: RawMutex,
108 /// Interior state — always accessed under `mutex`.
109 pub(crate) state: UnsafeCell<HchanState<T>>,
110}
111
112unsafe impl<T: Send> Send for Hchan<T> {}
113unsafe impl<T: Send> Sync for Hchan<T> {}
114
115impl<T> Hchan<T> {
116 pub(crate) fn new(cap: usize) -> Self {
117 Self {
118 mutex: RawMutex::new(),
119 state: UnsafeCell::new(HchanState::new(cap)),
120 }
121 }
122
123 /// Acquire the lock and return a guard + mutable state reference.
124 ///
125 /// The guard releases the lock when dropped. Drop it *before* calling
126 /// `gopark` so the scheduler can't see the lock still held.
127 ///
128 /// # Safety
129 /// The returned `&mut HchanState<T>` must not be used after the guard is
130 /// dropped (the lock no longer protects access).
131 #[allow(clippy::mut_from_ref)] // intentional: state is behind UnsafeCell
132 pub(crate) unsafe fn lock_state(&self) -> (LockGuard<'_>, &mut HchanState<T>) {
133 let g = LockGuard::new(&self.mutex);
134 // SAFETY: We just acquired the lock; no other thread holds a reference.
135 let s = unsafe { &mut *self.state.get() };
136 (g, s)
137 }
138}
139
140// ---------------------------------------------------------------------------
141// Public channel halves
142// ---------------------------------------------------------------------------
143
144/// The sending half of a channel. Cheap to `clone`.
145pub struct Sender<T>(Arc<Hchan<T>>);
146
147/// The receiving half of a channel. Cheap to `clone`.
148pub struct Receiver<T>(Arc<Hchan<T>>);
149
150impl<T> Clone for Sender<T> { fn clone(&self) -> Self { Sender(Arc::clone(&self.0)) } }
151impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { Receiver(Arc::clone(&self.0)) } }
152
153unsafe impl<T: Send> Send for Sender<T> {}
154unsafe impl<T: Send> Sync for Sender<T> {}
155unsafe impl<T: Send> Send for Receiver<T> {}
156unsafe impl<T: Send> Sync for Receiver<T> {}
157
158/// Create a new channel with the given buffer capacity.
159///
160/// `cap == 0` gives an unbuffered (synchronous rendezvous) channel; `cap > 0`
161/// gives a buffered channel that holds up to `cap` values without blocking the
162/// sender.
163///
164/// Returns `(Sender<T>, Receiver<T>)`.
165pub fn chan<T: Send + 'static>(cap: usize) -> (Sender<T>, Receiver<T>) {
166 let h = Arc::new(Hchan::new(cap));
167 (Sender(Arc::clone(&h)), Receiver(h))
168}
169
170impl<T: Send + 'static> Sender<T> {
171 /// Send `val`, blocking until a receiver is ready or buffer space opens.
172 ///
173 /// # Panics
174 /// Panics if the channel has been closed.
175 pub fn send(&self, val: T) {
176 unsafe { chansend(&self.0, val, true) };
177 }
178
179 /// Non-blocking send. Returns `false` if the buffer is full or there is
180 /// no waiting receiver. Panics if the channel is closed.
181 pub fn try_send(&self, val: T) -> bool {
182 unsafe { chansend(&self.0, val, false) }
183 }
184
185 /// Close the channel. Panics if already closed.
186 pub fn close(&self) {
187 unsafe { closechan(&self.0) };
188 }
189
190 /// Raw access to the underlying `Hchan` for use by `selectgo`.
191 pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
192}
193
194impl<T: Send + 'static> Receiver<T> {
195 /// Receive a value, blocking until one is available or the channel closes.
196 ///
197 /// Returns `Some(val)` on success, `None` if the channel is closed and
198 /// the buffer is fully drained.
199 pub fn recv(&self) -> Option<T> {
200 unsafe { chanrecv(&self.0, true) }
201 }
202
203 /// Non-blocking receive.
204 ///
205 /// - `Some(Some(val))` — received.
206 /// - `Some(None)` — channel closed and empty.
207 /// - `None` — would block (nothing ready yet).
208 pub fn try_recv(&self) -> Option<Option<T>> {
209 unsafe { chanrecv_nb(&self.0) }
210 }
211
212 /// Raw access to the underlying `Hchan` for use by `selectgo`.
213 pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
214}
215
216// ---------------------------------------------------------------------------
217// chansend
218// ---------------------------------------------------------------------------
219
220/// Send `val` to `c`.
221///
222/// `block = true` → park the goroutine if the channel has no space.
223/// `block = false` → return `false` immediately if the channel has no space.
224///
225/// # Safety
226/// Must be called from a goroutine (not g0 or an OS-thread main function).
227///
228/// Ported from `chansend` in `runtime/chan.go`.
229pub(crate) unsafe fn chansend<T: Send + 'static>(
230 c: &Arc<Hchan<T>>,
231 val: T,
232 block: bool,
233) -> bool {
234 // SAFETY: we hold the lock for the duration of the guard's scope.
235 let (_g, state) = unsafe { c.lock_state() };
236
237 if state.closed {
238 drop(_g);
239 panic!("send on closed channel");
240 }
241
242 // ── Case 1: direct handoff to a waiting receiver ─────────────────────────
243 let recv_sg = unsafe { state.recvq.dequeue() };
244 if !recv_sg.is_null() {
245 let gp = unsafe { (*recv_sg).g };
246 // recv_sg.elem points to Option<T> (allocated by chanrecv or selectgo).
247 let elem_ptr = unsafe { (*recv_sg).elem as *mut Option<T> };
248 if !elem_ptr.is_null() {
249 unsafe { *elem_ptr = Some(val) };
250 }
251 unsafe {
252 (*recv_sg).success = true;
253 (*gp).param = recv_sg as *mut u8;
254 }
255 drop(_g);
256 unsafe { goready(gp) };
257 return true;
258 }
259
260 // ── Case 2: buffer has space ──────────────────────────────────────────────
261 if state.buf.len() < state.cap {
262 state.buf.push_back(val);
263 return true;
264 }
265
266 // ── Case 3: non-blocking — cannot proceed ────────────────────────────────
267 if !block {
268 return false;
269 }
270
271 // ── Case 4: block — enqueue this goroutine as a waiting sender ───────────
272 let gp = current_g();
273 debug_assert!(!gp.is_null(), "chansend: called from g0");
274
275 // Box<ManuallyDrop<T>> — the receiver moves the value out and frees the box.
276 let elem_ptr = Box::into_raw(Box::new(ManuallyDrop::new(val))) as *mut u8;
277
278 let s = acquire_sudog();
279 unsafe {
280 (*s).g = gp;
281 (*s).elem = elem_ptr;
282 (*s).boxed_elem = true; // Box<ManuallyDrop<T>> — must be freed by receiver
283 (*s).success = false;
284 (*s).c = Arc::as_ptr(c) as *mut u8;
285 (*gp).param = ptr::null_mut();
286 state.sendq.enqueue(s);
287 }
288
289 drop(_g); // release lock BEFORE parking
290 gopark(WaitReason::ChanSend);
291
292 // ── Resumed: inspect outcome ─────────────────────────────────────────────
293 let ok = unsafe {
294 let s2 = (*gp).param as *mut Sudog;
295 (*gp).param = ptr::null_mut();
296 let ok = (*s2).success;
297
298 if !ok && !(*s2).elem.is_null() {
299 // Send was rejected (channel closed after we parked).
300 // elem points to ManuallyDrop<T> — drop the value, then free the box.
301 let ep = (*s2).elem as *mut ManuallyDrop<T>;
302 (*s2).elem = ptr::null_mut();
303 ManuallyDrop::drop(&mut *ep); // run T's destructor
304 if (*s2).boxed_elem { let _ = Box::from_raw(ep); }
305 }
306 (*s2).g = ptr::null_mut();
307 (*s2).c = ptr::null_mut();
308 release_sudog(s2);
309 ok
310 };
311
312 if !ok {
313 panic!("send on closed channel");
314 }
315 true
316}
317
318// ---------------------------------------------------------------------------
319// chanrecv
320// ---------------------------------------------------------------------------
321
322/// Receive from `c`.
323///
324/// `block = true` → park until a value or close.
325/// `block = false` → return `None` immediately if nothing is ready.
326///
327/// Returns `Some(val)` on success or `None` for closed-and-empty / would-block.
328///
329/// # Safety
330/// Must be called from a goroutine (not g0 or an OS-thread main function).
331///
332/// Ported from `chanrecv` in `runtime/chan.go`.
333pub(crate) unsafe fn chanrecv<T: Send + 'static>(
334 c: &Arc<Hchan<T>>,
335 block: bool,
336) -> Option<T> {
337 let (_g, state) = unsafe { c.lock_state() };
338
339 // ── Case 1: direct handoff from a waiting sender ─────────────────────────
340 let send_sg = unsafe { state.sendq.dequeue() };
341 if !send_sg.is_null() {
342 let val = recv_from_sender(state, send_sg);
343 drop(_g);
344 return Some(val);
345 }
346
347 // ── Case 2: buffer has data ───────────────────────────────────────────────
348 if !state.buf.is_empty() {
349 return Some(state.buf.pop_front().unwrap());
350 }
351
352 // ── Case 3: closed and empty ──────────────────────────────────────────────
353 if state.closed {
354 return None;
355 }
356
357 // ── Case 4: non-blocking — nothing ready ─────────────────────────────────
358 if !block {
359 return None;
360 }
361
362 // ── Case 5: block — enqueue as a waiting receiver ────────────────────────
363 let gp = current_g();
364 debug_assert!(!gp.is_null(), "chanrecv: called from g0");
365
366 // Box<Option<T>> — the sender writes Some(val) into this slot, or it
367 // stays None if the channel closes. Freed on wakeup.
368 let elem_ptr = Box::into_raw(Box::new(None::<T>)) as *mut u8;
369
370 let s = acquire_sudog();
371 unsafe {
372 (*s).g = gp;
373 (*s).elem = elem_ptr;
374 (*s).boxed_elem = true; // Box<Option<T>> — must be freed on wakeup
375 (*s).success = false;
376 (*s).c = Arc::as_ptr(c) as *mut u8;
377 (*gp).param = ptr::null_mut();
378 state.recvq.enqueue(s);
379 }
380
381 drop(_g);
382 gopark(WaitReason::ChanReceive);
383
384 // ── Resumed: read outcome ─────────────────────────────────────────────────
385 unsafe {
386 let s2 = (*gp).param as *mut Sudog;
387 (*gp).param = ptr::null_mut();
388 let ok = (*s2).success;
389
390 let boxed = (*s2).boxed_elem;
391 // elem points to Option<T>: Some(val) if sender delivered, None if closed.
392 let result = if ok {
393 debug_assert!(!(*s2).elem.is_null(), "chanrecv: success but elem is null");
394 let ep = (*s2).elem as *mut Option<T>;
395 (*s2).elem = ptr::null_mut();
396 // ptr::read moves the Option<T> out without running its destructor on ep.
397 let val = ptr::read(ep).expect("chanrecv: elem was None on success");
398 if boxed { let _ = Box::from_raw(ep); }
399 Some(val)
400 } else {
401 if !(*s2).elem.is_null() {
402 // Channel was closed — slot is None; just free the allocation.
403 let ep = (*s2).elem as *mut Option<T>;
404 (*s2).elem = ptr::null_mut();
405 if boxed { let _ = Box::from_raw(ep); }
406 }
407 None
408 };
409
410 (*s2).g = ptr::null_mut();
411 (*s2).c = ptr::null_mut();
412 release_sudog(s2);
413 result
414 }
415}
416
417/// Non-blocking receive.
418///
419/// Returns:
420/// - `Some(Some(v))` — value received.
421/// - `Some(None)` — channel closed and empty.
422/// - `None` — would block (channel has nothing ready right now).
423///
424/// # Safety
425/// May be called outside the scheduler as long as the blocking path is never
426/// triggered.
427pub(crate) unsafe fn chanrecv_nb<T: Send + 'static>(
428 c: &Arc<Hchan<T>>,
429) -> Option<Option<T>> {
430 let (_g, state) = unsafe { c.lock_state() };
431
432 let send_sg = unsafe { state.sendq.dequeue() };
433 if !send_sg.is_null() {
434 let val = recv_from_sender(state, send_sg);
435 drop(_g);
436 return Some(Some(val));
437 }
438
439 if !state.buf.is_empty() {
440 return Some(Some(state.buf.pop_front().unwrap()));
441 }
442
443 if state.closed {
444 return Some(None);
445 }
446
447 None
448}
449
450/// Receive from a **dequeued** sender sudog and wake the sender.
451///
452/// For unbuffered channels (`cap == 0`): value is moved directly from the
453/// sender's staging box.
454/// For buffered channels (always full when a sender is queued): take the head
455/// of the buffer, rotate the sender's value into the tail.
456///
457/// **Caller must release the channel lock after this returns**, before the
458/// woken goroutine can be scheduled.
459///
460/// Ported from `recv` in `runtime/chan.go`.
461fn recv_from_sender<T: Send + 'static>(
462 state: &mut HchanState<T>,
463 send_sg: *mut Sudog,
464) -> T {
465 let gp = unsafe { (*send_sg).g };
466
467 let boxed = unsafe { (*send_sg).boxed_elem };
468
469 // send_sg.elem points to ManuallyDrop<T> (both chansend and selectgo use
470 // ManuallyDrop semantics — same memory layout as T, no destructor).
471 let val = if state.cap == 0 {
472 let ep = unsafe { (*send_sg).elem as *mut ManuallyDrop<T> };
473 let v = unsafe { ManuallyDrop::into_inner(ptr::read(ep)) };
474 unsafe {
475 if boxed { let _ = Box::from_raw(ep); }
476 (*send_sg).elem = ptr::null_mut();
477 }
478 v
479 } else {
480 let head = state.buf.pop_front().unwrap();
481 let ep = unsafe { (*send_sg).elem as *mut ManuallyDrop<T> };
482 let sv = unsafe { ManuallyDrop::into_inner(ptr::read(ep)) };
483 unsafe {
484 if boxed { let _ = Box::from_raw(ep); }
485 (*send_sg).elem = ptr::null_mut();
486 }
487 state.buf.push_back(sv);
488 head
489 };
490
491 unsafe {
492 (*send_sg).success = true;
493 (*gp).param = send_sg as *mut u8;
494 }
495 unsafe { goready(gp) };
496 val
497}
498
499// ---------------------------------------------------------------------------
500// closechan
501// ---------------------------------------------------------------------------
502
503/// Close `c`.
504///
505/// Marks the channel closed, drains all waiting receivers (they get `None`)
506/// and senders (they panic), and wakes all of them.
507///
508/// # Panics
509/// Panics if the channel is already closed.
510///
511/// # Safety
512/// Must be called from a goroutine (not g0 / OS-thread main).
513///
514/// Ported from `closechan` in `runtime/chan.go`.
515pub(crate) unsafe fn closechan<T: Send + 'static>(c: &Arc<Hchan<T>>) {
516 let (_g, state) = unsafe { c.lock_state() };
517
518 if state.closed {
519 drop(_g);
520 panic!("close of closed channel");
521 }
522 state.closed = true;
523
524 let mut wakeup: Vec<*mut crate::runtime::g::G> = Vec::new();
525
526 loop {
527 let sg = unsafe { state.recvq.dequeue() };
528 if sg.is_null() { break; }
529 let gp = unsafe { (*sg).g };
530 unsafe {
531 (*sg).success = false;
532 (*gp).param = sg as *mut u8;
533 }
534 wakeup.push(gp);
535 }
536
537 loop {
538 let sg = unsafe { state.sendq.dequeue() };
539 if sg.is_null() { break; }
540 let gp = unsafe { (*sg).g };
541 unsafe {
542 (*sg).success = false;
543 (*gp).param = sg as *mut u8;
544 }
545 wakeup.push(gp);
546 }
547
548 drop(_g);
549
550 for gp in wakeup {
551 unsafe { goready(gp) };
552 }
553}
554
555// ---------------------------------------------------------------------------
556// Tests
557// ---------------------------------------------------------------------------
558
559#[cfg(all(test, not(loom)))]
560mod tests {
561 use super::*;
562 use crate::runtime::sched::run_impl;
563 use std::sync::atomic::{AtomicI32, Ordering};
564 use std::sync::Arc;
565
566 // ── Buffered: fast paths (no goroutine park) ──────────────────────────────
567
568 /// Single send + recv completes without blocking.
569 #[test]
570 fn buffered_send_recv() {
571 run_impl(|| {
572 let (tx, rx) = chan::<i32>(1);
573 tx.send(42);
574 assert_eq!(rx.recv(), Some(42));
575 });
576 }
577
578 /// Values arrive in FIFO order.
579 #[test]
580 fn buffered_fifo_order() {
581 run_impl(|| {
582 let (tx, rx) = chan::<i32>(4);
583 for i in 0..4_i32 { tx.send(i); }
584 for i in 0..4_i32 { assert_eq!(rx.recv(), Some(i)); }
585 });
586 }
587
588 /// Close drains buffered values, then recv returns None.
589 #[test]
590 fn buffered_close_drains_then_none() {
591 run_impl(|| {
592 let (tx, rx) = chan::<i32>(2);
593 tx.send(1);
594 tx.send(2);
595 tx.close();
596 assert_eq!(rx.recv(), Some(1));
597 assert_eq!(rx.recv(), Some(2));
598 assert_eq!(rx.recv(), None);
599 assert_eq!(rx.recv(), None); // idempotent
600 });
601 }
602
603 // ── Non-blocking ops ──────────────────────────────────────────────────────
604
605 /// try_recv on an empty open channel returns None (would block).
606 #[test]
607 fn try_recv_empty() {
608 run_impl(|| {
609 let (_tx, rx) = chan::<i32>(4);
610 assert_eq!(rx.try_recv(), None);
611 });
612 }
613
614 /// try_recv on a closed empty channel returns Some(None).
615 #[test]
616 fn try_recv_closed_empty() {
617 run_impl(|| {
618 let (tx, rx) = chan::<i32>(4);
619 tx.close();
620 assert_eq!(rx.try_recv(), Some(None));
621 });
622 }
623
624 /// try_send to a full channel returns false.
625 #[test]
626 fn try_send_full() {
627 run_impl(|| {
628 let (tx, _rx) = chan::<i32>(2);
629 assert!(tx.try_send(1));
630 assert!(tx.try_send(2));
631 assert!(!tx.try_send(3));
632 });
633 }
634
635 // ── Panic paths ───────────────────────────────────────────────────────────
636 //
637 // These don't exercise goroutine parking — the panic must unwind back to
638 // the test thread's #[should_panic] handler, so we must NOT wrap in run_impl.
639
640 /// Closing an already-closed channel panics.
641 #[test]
642 #[should_panic(expected = "close of closed channel")]
643 fn close_twice_panics() {
644 let (tx, _rx) = chan::<i32>(1);
645 tx.close();
646 tx.close();
647 }
648
649 /// Sending on a closed channel panics.
650 #[test]
651 #[should_panic(expected = "send on closed channel")]
652 fn send_on_closed_panics() {
653 let (tx, _rx) = chan::<i32>(1);
654 tx.close();
655 tx.send(1);
656 }
657
658 // ── Goroutine rendezvous (exercises park/unpark) ──────────────────────────
659
660 /// Unbuffered send and recv across two goroutines.
661 #[test]
662 fn unbuffered_rendezvous() {
663 use crate::runtime::sched::spawn_goroutine;
664
665 run_impl(|| {
666 let (tx, rx) = chan::<i32>(0);
667 spawn_goroutine(move || { tx.send(99); });
668 assert_eq!(rx.recv(), Some(99));
669 });
670 }
671
672 /// Ping-pong ten rounds across two goroutines.
673 #[test]
674 fn unbuffered_ping_pong() {
675 use crate::runtime::sched::spawn_goroutine;
676
677 run_impl(|| {
678 let (ping_tx, ping_rx) = chan::<i32>(0);
679 let (pong_tx, pong_rx) = chan::<i32>(0);
680
681 spawn_goroutine(move || {
682 for _ in 0..10 {
683 let v = ping_rx.recv().unwrap();
684 pong_tx.send(v + 1);
685 }
686 });
687
688 let mut n = 0_i32;
689 for _ in 0..10 {
690 ping_tx.send(n);
691 n = pong_rx.recv().unwrap();
692 }
693 assert_eq!(n, 10);
694 });
695 }
696
697 /// Buffered producer/consumer: 20 values summed by a goroutine.
698 #[test]
699 fn producer_consumer() {
700 use crate::runtime::sched::spawn_goroutine;
701
702 const N: i32 = 20;
703 let sum = Arc::new(AtomicI32::new(0));
704 let sum2 = Arc::clone(&sum);
705
706 run_impl(move || {
707 let (tx, rx) = chan::<i32>(4);
708 let sum3 = Arc::clone(&sum2);
709
710 spawn_goroutine(move || {
711 for i in 0..N { tx.send(i); }
712 tx.close();
713 });
714
715 spawn_goroutine(move || {
716 while let Some(v) = rx.recv() {
717 sum3.fetch_add(v, Ordering::Relaxed);
718 }
719 });
720
721 // A wall-clock deadline is robust across CI runner speeds and
722 // build profiles (debug, coverage/nightly) where frame sizes and
723 // instrumentation overhead can make goroutines run much slower.
724 let expected = N * (N - 1) / 2;
725 let deadline =
726 std::time::Instant::now() + std::time::Duration::from_secs(5);
727 while sum2.load(Ordering::Acquire) != expected
728 && std::time::Instant::now() < deadline
729 {
730 crate::gosched();
731 }
732 });
733
734 assert_eq!(sum.load(Ordering::Acquire), N * (N - 1) / 2);
735 }
736
737 /// Close wakes a goroutine that is blocked on recv.
738 #[test]
739 fn close_wakes_blocked_receiver() {
740 use crate::runtime::sched::spawn_goroutine;
741
742 let got_none = Arc::new(AtomicI32::new(0));
743 let got2 = Arc::clone(&got_none);
744 let got3 = Arc::clone(&got_none); // checked inside run_impl to bound the post-close loop
745
746 run_impl(move || {
747 let (tx, rx) = chan::<i32>(0);
748
749 spawn_goroutine(move || {
750 // Block on recv until the channel is closed.
751 if rx.recv().is_none() {
752 got2.fetch_add(1, Ordering::Relaxed);
753 }
754 });
755
756 // Give the spawned goroutine time to start and block on recv.
757 // More iterations than before because parallel test load on CI can
758 // starve goroutines for many scheduler rounds.
759 for _ in 0..500 { crate::gosched(); }
760 tx.close();
761 // Wait for the goroutine to observe the close and record its result.
762 // A wall-clock deadline is robust across CI runner speeds.
763 let deadline =
764 std::time::Instant::now() + std::time::Duration::from_secs(5);
765 while got3.load(Ordering::Acquire) == 0
766 && std::time::Instant::now() < deadline
767 {
768 crate::gosched();
769 }
770 });
771
772 assert_eq!(got_none.load(Ordering::Acquire), 1);
773 }
774}