go-lib 0.2.0

rust native goroutines
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
// SPDX-License-Identifier: Apache-2.0
//! Channels — ported from `src/runtime/chan.go`.
//!
//! Buffered and unbuffered channels backed by the same G/M/P scheduler used by
//! goroutines.  A goroutine that blocks on a channel send or receive is parked
//! via `gopark` and resumed via `goready`; no OS thread is ever blocked.
//!
//! ## Public surface
//!
//! ```no_run
//! let (tx, rx) = go_lib::chan::chan::<i32>(0);   // unbuffered
//! let (tx, rx) = go_lib::chan::chan::<i32>(16);  // buffered, capacity 16
//!
//! tx.send(42_i32);
//! let v = rx.recv();  // Some(42); None means closed + empty
//! tx.close();
//! ```
//!
//! ## Internals
//!
//! Each channel is an `Arc<Hchan<T>>`.  The lock-protected interior holds a
//! `VecDeque<T>` ring buffer plus two wait queues (`sendq` / `recvq`) of
//! `Sudog` records (one per blocked goroutine).
//!
//! Locking uses `RawMutex` (not `std::sync::Mutex`) so that `selectgo` can
//! hold multiple heterogeneous channel locks simultaneously without needing a
//! typed `MutexGuard<HchanState<T>>` for each one.
//!
//! ### Blocking protocol
//!
//! When a goroutine must block it:
//! 1. Allocates a `Sudog` from the pool.
//! 2. Heap-allocates a `MaybeUninit<T>` as the value staging area (`sudog.elem`).
//! 3. Enqueues the sudog in `sendq` / `recvq` (under the channel lock).
//! 4. Releases the lock.
//! 5. Calls `gopark` — `park_fn` sets `GWAITING` on g0's stack.
//!
//! The goroutine that completes the operation:
//! - Reads or writes through `sudog.elem`.
//! - Sets `sudog.success` and `(*gp).param = sudog as *mut u8`.
//! - Calls `goready`, which spins until the target is `GWAITING` before
//!   marking it `GRUNNABLE`.
//!
//! ### Close semantics (matches Go)
//!
//! - Sending on a closed channel **panics**.
//! - Receiving from a closed empty channel returns `None`.
//! - Closing an already-closed channel **panics**.
//!
//! Ported from `hchan`, `chansend`, `chanrecv`, `closechan` in
//! `runtime/chan.go`.

use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::Arc;

use crate::runtime::g::{current_g, WaitReason};
use crate::runtime::park::{gopark, goready};
use crate::runtime::rawmutex::{LockGuard, RawMutex};
use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog, WaitQ};

// ---------------------------------------------------------------------------
// Hchan — the heap channel object
// ---------------------------------------------------------------------------

/// Lock-protected interior of a channel.
pub(crate) struct HchanState<T> {
    /// Buffered elements waiting to be received (FIFO).
    pub(crate) buf:    VecDeque<T>,
    /// Buffer capacity (0 = unbuffered / synchronous).
    pub(crate) cap:    usize,
    /// True after `close()`.
    pub(crate) closed: bool,
    /// Goroutines blocked in `send` (buffer full or unbuffered with no receiver).
    pub(crate) sendq:  WaitQ,
    /// Goroutines blocked in `recv` (buffer empty or unbuffered with no sender).
    pub(crate) recvq:  WaitQ,
}

impl<T> HchanState<T> {
    fn new(cap: usize) -> Self {
        Self {
            buf:    VecDeque::with_capacity(cap),
            cap,
            closed: false,
            sendq:  WaitQ::new(),
            recvq:  WaitQ::new(),
        }
    }
}

/// The channel heap object, shared via `Arc` between all `Sender`/`Receiver`
/// clones.
///
/// `pub(crate)` so that `selectgo` (step 14) can access the state directly.
///
/// The `mutex` field is first so that `Arc::as_ptr(h) as *const RawMutex` gives
/// a stable address suitable for address-ordered lock acquisition in `selectgo`.
pub(crate) struct Hchan<T> {
    /// Raw adaptive spinlock protecting `state`.
    ///
    /// Exposed `pub(crate)` so `selectgo` can lock/unlock multiple heterogeneous
    /// channels without needing typed `MutexGuard` storage.
    pub(crate) mutex: RawMutex,
    /// Interior state — always accessed under `mutex`.
    pub(crate) state: UnsafeCell<HchanState<T>>,
}

unsafe impl<T: Send> Send for Hchan<T> {}
unsafe impl<T: Send> Sync for Hchan<T> {}

impl<T> Hchan<T> {
    pub(crate) fn new(cap: usize) -> Self {
        Self {
            mutex: RawMutex::new(),
            state: UnsafeCell::new(HchanState::new(cap)),
        }
    }

    /// Acquire the lock and return a guard + mutable state reference.
    ///
    /// The guard releases the lock when dropped.  Drop it *before* calling
    /// `gopark` so the scheduler can't see the lock still held.
    ///
    /// # Safety
    /// The returned `&mut HchanState<T>` must not be used after the guard is
    /// dropped (the lock no longer protects access).
    #[allow(clippy::mut_from_ref)] // intentional: state is behind UnsafeCell
    pub(crate) unsafe fn lock_state(&self) -> (LockGuard<'_>, &mut HchanState<T>) {
        let g = LockGuard::new(&self.mutex);
        // SAFETY: We just acquired the lock; no other thread holds a reference.
        let s = unsafe { &mut *self.state.get() };
        (g, s)
    }
}

// ---------------------------------------------------------------------------
// Public channel halves
// ---------------------------------------------------------------------------

/// The sending half of a channel.  Cheap to `clone`.
pub struct Sender<T>(Arc<Hchan<T>>);

/// The receiving half of a channel.  Cheap to `clone`.
pub struct Receiver<T>(Arc<Hchan<T>>);

impl<T> Clone for Sender<T>   { fn clone(&self) -> Self { Sender(Arc::clone(&self.0))   } }
impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { Receiver(Arc::clone(&self.0)) } }

unsafe impl<T: Send> Send for Sender<T>   {}
unsafe impl<T: Send> Sync for Sender<T>   {}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}

/// Create a new channel with the given buffer capacity.
///
/// `cap == 0` gives an unbuffered (synchronous rendezvous) channel; `cap > 0`
/// gives a buffered channel that holds up to `cap` values without blocking the
/// sender.
///
/// Returns `(Sender<T>, Receiver<T>)`.
pub fn chan<T: Send + 'static>(cap: usize) -> (Sender<T>, Receiver<T>) {
    let h = Arc::new(Hchan::new(cap));
    (Sender(Arc::clone(&h)), Receiver(h))
}

impl<T: Send + 'static> Sender<T> {
    /// Send `val`, blocking until a receiver is ready or buffer space opens.
    ///
    /// # Panics
    /// Panics if the channel has been closed.
    pub fn send(&self, val: T) {
        unsafe { chansend(&self.0, val, true) };
    }

    /// Non-blocking send.  Returns `false` if the buffer is full or there is
    /// no waiting receiver.  Panics if the channel is closed.
    pub fn try_send(&self, val: T) -> bool {
        unsafe { chansend(&self.0, val, false) }
    }

    /// Close the channel.  Panics if already closed.
    pub fn close(&self) {
        unsafe { closechan(&self.0) };
    }

    /// Raw access to the underlying `Hchan` for use by `selectgo`.
    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
}

impl<T: Send + 'static> Receiver<T> {
    /// Receive a value, blocking until one is available or the channel closes.
    ///
    /// Returns `Some(val)` on success, `None` if the channel is closed and
    /// the buffer is fully drained.
    pub fn recv(&self) -> Option<T> {
        unsafe { chanrecv(&self.0, true) }
    }

    /// Non-blocking receive.
    ///
    /// - `Some(Some(val))` — received.
    /// - `Some(None)`      — channel closed and empty.
    /// - `None`            — would block (nothing ready yet).
    pub fn try_recv(&self) -> Option<Option<T>> {
        unsafe { chanrecv_nb(&self.0) }
    }

    /// Raw access to the underlying `Hchan` for use by `selectgo`.
    pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
}

// ---------------------------------------------------------------------------
// chansend
// ---------------------------------------------------------------------------

/// Send `val` to `c`.
///
/// `block = true`  → park the goroutine if the channel has no space.
/// `block = false` → return `false` immediately if the channel has no space.
///
/// # Safety
/// Must be called from a goroutine (not g0 or an OS-thread main function).
///
/// Ported from `chansend` in `runtime/chan.go`.
pub(crate) unsafe fn chansend<T: Send + 'static>(
    c:     &Arc<Hchan<T>>,
    val:   T,
    block: bool,
) -> bool {
    // SAFETY: we hold the lock for the duration of the guard's scope.
    let (_g, state) = unsafe { c.lock_state() };

    if state.closed {
        drop(_g);
        panic!("send on closed channel");
    }

    // ── Case 1: direct handoff to a waiting receiver ─────────────────────────
    let recv_sg = unsafe { state.recvq.dequeue() };
    if !recv_sg.is_null() {
        let gp       = unsafe { (*recv_sg).g };
        let elem_ptr = unsafe { (*recv_sg).elem as *mut MaybeUninit<T> };
        if !elem_ptr.is_null() {
            unsafe { (*elem_ptr).write(val) };
        }
        unsafe {
            (*recv_sg).success = true;
            (*gp).param        = recv_sg as *mut u8;
        }
        drop(_g);
        unsafe { goready(gp) };
        return true;
    }

    // ── Case 2: buffer has space ──────────────────────────────────────────────
    if state.buf.len() < state.cap {
        state.buf.push_back(val);
        return true;
    }

    // ── Case 3: non-blocking — cannot proceed ────────────────────────────────
    if !block {
        return false;
    }

    // ── Case 4: block — enqueue this goroutine as a waiting sender ───────────
    let gp = current_g();
    debug_assert!(!gp.is_null(), "chansend: called from g0");

    let elem_ptr = Box::into_raw(Box::new(MaybeUninit::new(val))) as *mut u8;

    let s = acquire_sudog();
    unsafe {
        (*s).g          = gp;
        (*s).elem       = elem_ptr;
        (*s).boxed_elem = true; // Box<MaybeUninit<T>> — must be freed by receiver
        (*s).success    = false;
        (*s).c          = Arc::as_ptr(c) as *mut u8;
        (*gp).param     = ptr::null_mut();
        state.sendq.enqueue(s);
    }

    drop(_g); // release lock BEFORE parking
    unsafe { gopark(WaitReason::ChanSend) };

    // ── Resumed: inspect outcome ─────────────────────────────────────────────
    let ok = unsafe {
        let s2 = (*gp).param as *mut Sudog;
        (*gp).param = ptr::null_mut();
        let ok = (*s2).success;

        if !ok && !(*s2).elem.is_null() {
            let ep = (*s2).elem as *mut MaybeUninit<T>;
            (*s2).elem = ptr::null_mut();
            (*ep).assume_init_drop();
            if (*s2).boxed_elem { let _ = Box::from_raw(ep); }
        }
        (*s2).g = ptr::null_mut();
        (*s2).c = ptr::null_mut();
        release_sudog(s2);
        ok
    };

    if !ok {
        panic!("send on closed channel");
    }
    true
}

// ---------------------------------------------------------------------------
// chanrecv
// ---------------------------------------------------------------------------

/// Receive from `c`.
///
/// `block = true`  → park until a value or close.
/// `block = false` → return `None` immediately if nothing is ready.
///
/// Returns `Some(val)` on success or `None` for closed-and-empty / would-block.
///
/// # Safety
/// Must be called from a goroutine (not g0 or an OS-thread main function).
///
/// Ported from `chanrecv` in `runtime/chan.go`.
pub(crate) unsafe fn chanrecv<T: Send + 'static>(
    c:     &Arc<Hchan<T>>,
    block: bool,
) -> Option<T> {
    let (_g, state) = unsafe { c.lock_state() };

    // ── Case 1: direct handoff from a waiting sender ─────────────────────────
    let send_sg = unsafe { state.sendq.dequeue() };
    if !send_sg.is_null() {
        let val = recv_from_sender(state, send_sg);
        drop(_g);
        return Some(val);
    }

    // ── Case 2: buffer has data ───────────────────────────────────────────────
    if !state.buf.is_empty() {
        return Some(state.buf.pop_front().unwrap());
    }

    // ── Case 3: closed and empty ──────────────────────────────────────────────
    if state.closed {
        return None;
    }

    // ── Case 4: non-blocking — nothing ready ─────────────────────────────────
    if !block {
        return None;
    }

    // ── Case 5: block — enqueue as a waiting receiver ────────────────────────
    let gp = current_g();
    debug_assert!(!gp.is_null(), "chanrecv: called from g0");

    let elem_ptr = Box::into_raw(Box::new(MaybeUninit::<T>::uninit())) as *mut u8;

    let s = acquire_sudog();
    unsafe {
        (*s).g          = gp;
        (*s).elem       = elem_ptr;
        (*s).boxed_elem = true; // Box<MaybeUninit<T>> — must be freed on wakeup
        (*s).success    = false;
        (*s).c          = Arc::as_ptr(c) as *mut u8;
        (*gp).param     = ptr::null_mut();
        state.recvq.enqueue(s);
    }

    drop(_g);
    unsafe { gopark(WaitReason::ChanReceive) };

    // ── Resumed: read outcome ─────────────────────────────────────────────────
    unsafe {
        let s2 = (*gp).param as *mut Sudog;
        (*gp).param = ptr::null_mut();
        let ok = (*s2).success;

        let boxed = (*s2).boxed_elem;
        let result = if ok {
            debug_assert!(!(*s2).elem.is_null(), "chanrecv: success but elem is null");
            let ep = (*s2).elem as *mut MaybeUninit<T>;
            (*s2).elem = ptr::null_mut();
            let val = (*ep).assume_init_read();
            if boxed { let _ = Box::from_raw(ep); }
            Some(val)
        } else {
            if !(*s2).elem.is_null() {
                let ep = (*s2).elem as *mut MaybeUninit<T>;
                (*s2).elem = ptr::null_mut();
                if boxed { let _ = Box::from_raw(ep); } // uninitialised — don't assume_init
            }
            None
        };

        (*s2).g = ptr::null_mut();
        (*s2).c = ptr::null_mut();
        release_sudog(s2);
        result
    }
}

/// Non-blocking receive.
///
/// Returns:
/// - `Some(Some(v))` — value received.
/// - `Some(None)`    — channel closed and empty.
/// - `None`          — would block (channel has nothing ready right now).
///
/// # Safety
/// May be called outside the scheduler as long as the blocking path is never
/// triggered.
pub(crate) unsafe fn chanrecv_nb<T: Send + 'static>(
    c: &Arc<Hchan<T>>,
) -> Option<Option<T>> {
    let (_g, state) = unsafe { c.lock_state() };

    let send_sg = unsafe { state.sendq.dequeue() };
    if !send_sg.is_null() {
        let val = recv_from_sender(state, send_sg);
        drop(_g);
        return Some(Some(val));
    }

    if !state.buf.is_empty() {
        return Some(Some(state.buf.pop_front().unwrap()));
    }

    if state.closed {
        return Some(None);
    }

    None
}

/// Receive from a **dequeued** sender sudog and wake the sender.
///
/// For unbuffered channels (`cap == 0`): value is moved directly from the
/// sender's staging box.
/// For buffered channels (always full when a sender is queued): take the head
/// of the buffer, rotate the sender's value into the tail.
///
/// **Caller must release the channel lock after this returns**, before the
/// woken goroutine can be scheduled.
///
/// Ported from `recv` in `runtime/chan.go`.
fn recv_from_sender<T: Send + 'static>(
    state:   &mut HchanState<T>,
    send_sg: *mut Sudog,
) -> T {
    let gp = unsafe { (*send_sg).g };

    let boxed = unsafe { (*send_sg).boxed_elem };

    let val = if state.cap == 0 {
        let ep = unsafe { (*send_sg).elem as *mut MaybeUninit<T> };
        let v  = unsafe { (*ep).assume_init_read() };
        unsafe {
            if boxed { let _ = Box::from_raw(ep); }
            (*send_sg).elem = ptr::null_mut();
        }
        v
    } else {
        let head = state.buf.pop_front().unwrap();
        let ep   = unsafe { (*send_sg).elem as *mut MaybeUninit<T> };
        let sv   = unsafe { (*ep).assume_init_read() };
        unsafe {
            if boxed { let _ = Box::from_raw(ep); }
            (*send_sg).elem = ptr::null_mut();
        }
        state.buf.push_back(sv);
        head
    };

    unsafe {
        (*send_sg).success = true;
        (*gp).param        = send_sg as *mut u8;
    }
    unsafe { goready(gp) };
    val
}

// ---------------------------------------------------------------------------
// closechan
// ---------------------------------------------------------------------------

/// Close `c`.
///
/// Marks the channel closed, drains all waiting receivers (they get `None`)
/// and senders (they panic), and wakes all of them.
///
/// # Panics
/// Panics if the channel is already closed.
///
/// # Safety
/// Must be called from a goroutine (not g0 / OS-thread main).
///
/// Ported from `closechan` in `runtime/chan.go`.
pub(crate) unsafe fn closechan<T: Send + 'static>(c: &Arc<Hchan<T>>) {
    let (_g, state) = unsafe { c.lock_state() };

    if state.closed {
        drop(_g);
        panic!("close of closed channel");
    }
    state.closed = true;

    let mut wakeup: Vec<*mut crate::runtime::g::G> = Vec::new();

    loop {
        let sg = unsafe { state.recvq.dequeue() };
        if sg.is_null() { break; }
        let gp = unsafe { (*sg).g };
        unsafe {
            (*sg).success = false;
            (*gp).param   = sg as *mut u8;
        }
        wakeup.push(gp);
    }

    loop {
        let sg = unsafe { state.sendq.dequeue() };
        if sg.is_null() { break; }
        let gp = unsafe { (*sg).g };
        unsafe {
            (*sg).success = false;
            (*gp).param   = sg as *mut u8;
        }
        wakeup.push(gp);
    }

    drop(_g);

    for gp in wakeup {
        unsafe { goready(gp) };
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(all(test, not(loom)))]
mod tests {
    use super::*;
    use crate::runtime::sched::run_impl;
    use std::sync::atomic::{AtomicI32, Ordering};
    use std::sync::Arc;

    // ── Buffered: fast paths (no goroutine park) ──────────────────────────────

    /// Single send + recv completes without blocking.
    #[test]
    fn buffered_send_recv() {
        run_impl(|| {
            let (tx, rx) = chan::<i32>(1);
            tx.send(42);
            assert_eq!(rx.recv(), Some(42));
        });
    }

    /// Values arrive in FIFO order.
    #[test]
    fn buffered_fifo_order() {
        run_impl(|| {
            let (tx, rx) = chan::<i32>(4);
            for i in 0..4_i32 { tx.send(i); }
            for i in 0..4_i32 { assert_eq!(rx.recv(), Some(i)); }
        });
    }

    /// Close drains buffered values, then recv returns None.
    #[test]
    fn buffered_close_drains_then_none() {
        run_impl(|| {
            let (tx, rx) = chan::<i32>(2);
            tx.send(1);
            tx.send(2);
            tx.close();
            assert_eq!(rx.recv(), Some(1));
            assert_eq!(rx.recv(), Some(2));
            assert_eq!(rx.recv(), None);
            assert_eq!(rx.recv(), None); // idempotent
        });
    }

    // ── Non-blocking ops ──────────────────────────────────────────────────────

    /// try_recv on an empty open channel returns None (would block).
    #[test]
    fn try_recv_empty() {
        run_impl(|| {
            let (_tx, rx) = chan::<i32>(4);
            assert_eq!(rx.try_recv(), None);
        });
    }

    /// try_recv on a closed empty channel returns Some(None).
    #[test]
    fn try_recv_closed_empty() {
        run_impl(|| {
            let (tx, rx) = chan::<i32>(4);
            tx.close();
            assert_eq!(rx.try_recv(), Some(None));
        });
    }

    /// try_send to a full channel returns false.
    #[test]
    fn try_send_full() {
        run_impl(|| {
            let (tx, _rx) = chan::<i32>(2);
            assert!(tx.try_send(1));
            assert!(tx.try_send(2));
            assert!(!tx.try_send(3));
        });
    }

    // ── Panic paths ───────────────────────────────────────────────────────────
    //
    // These don't exercise goroutine parking — the panic must unwind back to
    // the test thread's #[should_panic] handler, so we must NOT wrap in run_impl.

    /// Closing an already-closed channel panics.
    #[test]
    #[should_panic(expected = "close of closed channel")]
    fn close_twice_panics() {
        let (tx, _rx) = chan::<i32>(1);
        tx.close();
        tx.close();
    }

    /// Sending on a closed channel panics.
    #[test]
    #[should_panic(expected = "send on closed channel")]
    fn send_on_closed_panics() {
        let (tx, _rx) = chan::<i32>(1);
        tx.close();
        tx.send(1);
    }

    // ── Goroutine rendezvous (exercises park/unpark) ──────────────────────────

    /// Unbuffered send and recv across two goroutines.
    #[test]
    fn unbuffered_rendezvous() {
        use crate::runtime::sched::spawn_goroutine;

        run_impl(|| {
            let (tx, rx) = chan::<i32>(0);
            unsafe {
                spawn_goroutine(move || { tx.send(99); });
            }
            assert_eq!(rx.recv(), Some(99));
        });
    }

    /// Ping-pong ten rounds across two goroutines.
    #[test]
    fn unbuffered_ping_pong() {
        use crate::runtime::sched::spawn_goroutine;

        run_impl(|| {
            let (ping_tx, ping_rx) = chan::<i32>(0);
            let (pong_tx, pong_rx) = chan::<i32>(0);

            unsafe {
                spawn_goroutine(move || {
                    for _ in 0..10 {
                        let v = ping_rx.recv().unwrap();
                        pong_tx.send(v + 1);
                    }
                });
            }

            let mut n = 0_i32;
            for _ in 0..10 {
                ping_tx.send(n);
                n = pong_rx.recv().unwrap();
            }
            assert_eq!(n, 10);
        });
    }

    /// Buffered producer/consumer: 20 values summed by a goroutine.
    #[test]
    fn producer_consumer() {
        use crate::runtime::sched::spawn_goroutine;

        const N: i32 = 20;
        let sum = Arc::new(AtomicI32::new(0));
        let sum2 = Arc::clone(&sum);

        run_impl(move || {
            let (tx, rx) = chan::<i32>(4);
            let sum3 = Arc::clone(&sum2);

            unsafe {
                spawn_goroutine(move || {
                    for i in 0..N { tx.send(i); }
                    tx.close();
                });
            }

            unsafe {
                spawn_goroutine(move || {
                    while let Some(v) = rx.recv() {
                        sum3.fetch_add(v, Ordering::Relaxed);
                    }
                });
            }

            for _ in 0..500 { crate::gosched(); }
        });

        assert_eq!(sum.load(Ordering::Acquire), N * (N - 1) / 2);
    }

    /// Close wakes a goroutine that is blocked on recv.
    #[test]
    fn close_wakes_blocked_receiver() {
        use crate::runtime::sched::spawn_goroutine;

        let got_none = Arc::new(AtomicI32::new(0));
        let got2 = Arc::clone(&got_none);

        run_impl(move || {
            let (tx, rx) = chan::<i32>(0);

            unsafe {
                spawn_goroutine(move || {
                    // Block on recv until the channel is closed.
                    if rx.recv().is_none() {
                        got2.fetch_add(1, Ordering::Relaxed);
                    }
                });
            }

            for _ in 0..20 { crate::gosched(); }
            tx.close();
            for _ in 0..20 { crate::gosched(); }
        });

        assert_eq!(got_none.load(Ordering::Acquire), 1);
    }
}