1use std::mem::{ManuallyDrop, MaybeUninit};
58use std::ptr;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61
62use crate::chan::{Hchan, Receiver, Sender};
63use crate::runtime::g::{current_g, G, WaitReason};
64use crate::runtime::park::{gopark, goready};
65use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog};
66
67pub const CASE_DEFAULT: usize = usize::MAX;
73
74#[derive(Debug)]
80pub(crate) enum TryResult {
81 NotReady,
83
84 Done { ok: bool },
87
88 Handoff { gp: *mut G, ok: bool },
93
94 ClosedSend,
97}
98
99unsafe impl Send for TryResult {}
102
103#[doc(hidden)]
111pub struct SCase {
112 pub(crate) chan_ptr: *const (),
115
116 pub(crate) sg: *mut Sudog,
120
121 pub(crate) elem: *mut u8,
127
128 pub(crate) lock_fn: unsafe fn(*const ()),
132
133 pub(crate) unlock_fn: unsafe fn(*const ()),
135
136 pub(crate) try_fn: unsafe fn(*const (), *mut u8) -> TryResult,
143
144 pub(crate) enqueue_fn: unsafe fn(*const (), *mut Sudog),
146
147 pub(crate) dequeue_fn: unsafe fn(*const (), *mut Sudog),
150}
151
152unsafe impl Send for SCase {}
155
156struct Lehmer(u64);
165
166impl Lehmer {
167 fn from_goid() -> Self {
168 let goid = unsafe {
169 let gp = current_g();
170 if gp.is_null() { 1 } else { (*gp).goid | 1 }
171 };
172 Lehmer(goid | 1) }
174
175 fn next_usize(&mut self, n: usize) -> usize {
177 self.0 = self.0.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
179 ((self.0 >> 33) as usize) % n
180 }
181}
182
183#[doc(hidden)]
203pub unsafe fn selectgo(cases: &mut [SCase], has_default: bool) -> (usize, bool) {
204 let n = cases.len();
205
206 let mut pollorder: Vec<usize> = (0..n).collect();
208 let mut rng = Lehmer::from_goid();
209 for i in (1..n).rev() {
211 let j = rng.next_usize(i + 1);
212 pollorder.swap(i, j);
213 }
214
215 let mut lockorder: Vec<usize> = (0..n).collect();
217 lockorder.sort_by_key(|&i| cases[i].chan_ptr as usize);
218 lockorder.dedup_by_key(|&mut i| cases[i].chan_ptr as usize);
220
221 for &i in &lockorder {
223 unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
224 }
225
226 for &i in &pollorder {
228 let result = unsafe { (cases[i].try_fn)(cases[i].chan_ptr, cases[i].elem) };
229 match result {
230 TryResult::NotReady => continue,
231
232 TryResult::Done { ok } => {
233 for &j in &lockorder {
235 unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
236 }
237 return (i, ok);
238 }
239
240 TryResult::Handoff { gp, ok } => {
241 for &j in &lockorder {
243 unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
244 }
245 unsafe { goready(gp) };
246 return (i, ok);
247 }
248
249 TryResult::ClosedSend => {
250 for &j in &lockorder {
251 unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
252 }
253 panic!("send on closed channel");
254 }
255 }
256 }
257
258 if has_default {
260 for &i in &lockorder {
261 unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
262 }
263 return (CASE_DEFAULT, false);
264 }
265
266 let gp = current_g();
268 debug_assert!(!gp.is_null(), "selectgo: called from g0");
269
270 for case in cases.iter_mut() {
271 let sg = acquire_sudog();
272 unsafe {
273 (*sg).g = gp;
274 (*sg).elem = case.elem;
275 (*sg).is_select = true;
276 (*sg).success = false;
277 (*sg).c = case.chan_ptr as *mut u8;
278 }
279 case.sg = sg;
280 unsafe { (case.enqueue_fn)(case.chan_ptr, sg) };
281 }
282
283 unsafe { (*gp).selectdone.store(0, Ordering::Release) };
285 unsafe { (*gp).param = ptr::null_mut() };
286
287 for &i in &lockorder {
289 unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
290 }
291
292 unsafe { gopark(WaitReason::Select) };
293
294 let sg_winner = unsafe { (*gp).param as *mut Sudog };
298 unsafe { (*gp).param = ptr::null_mut() };
299 let ok = unsafe { (*sg_winner).success };
300
301 let winner = cases
303 .iter()
304 .position(|c| c.sg == sg_winner)
305 .expect("selectgo: winning sudog not found in cases");
306
307 for &i in &lockorder {
309 unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
310 }
311
312 for (i, case) in cases.iter_mut().enumerate() {
314 if i == winner { continue; }
315 let sg = case.sg;
316 unsafe { (case.dequeue_fn)(case.chan_ptr, sg) };
317 }
318
319 for &i in &lockorder {
321 unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
322 }
323
324 for case in cases.iter_mut() {
326 let sg = case.sg;
327 case.sg = ptr::null_mut();
328 unsafe {
329 (*sg).g = ptr::null_mut();
330 (*sg).elem = ptr::null_mut();
331 (*sg).c = ptr::null_mut();
332 release_sudog(sg);
333 }
334 }
335
336 (winner, ok)
337}
338
339pub(crate) unsafe fn lock_chan<T>(p: *const ()) {
344 (*(p as *const Hchan<T>)).mutex.lock();
345}
346
347pub(crate) unsafe fn unlock_chan<T>(p: *const ()) {
348 (*(p as *const Hchan<T>)).mutex.unlock();
349}
350
351pub(crate) unsafe fn try_send_chan<T: Send + 'static>(
352 p: *const (),
353 elem: *mut u8,
354) -> TryResult {
355 let hchan = &*(p as *const Hchan<T>);
356 let state = &mut *hchan.state.get();
357
358 if state.closed {
359 return TryResult::ClosedSend;
360 }
361
362 let recv_sg = state.recvq.dequeue();
364 if !recv_sg.is_null() {
365 let gp = (*recv_sg).g;
366 let ep = (*recv_sg).elem as *mut MaybeUninit<T>;
367 if !ep.is_null() {
368 (*ep).write(ptr::read(elem as *const T));
370 }
371 (*recv_sg).success = true;
372 (*gp).param = recv_sg as *mut u8;
373 return TryResult::Handoff { gp, ok: true };
374 }
375
376 if state.buf.len() < state.cap {
378 state.buf.push_back(ptr::read(elem as *const T));
379 return TryResult::Done { ok: true };
380 }
381
382 TryResult::NotReady
383}
384
385pub(crate) unsafe fn try_recv_chan<T: Send + 'static>(
386 p: *const (),
387 elem: *mut u8,
388) -> TryResult {
389 let hchan = &*(p as *const Hchan<T>);
390 let state = &mut *hchan.state.get();
391
392 let send_sg = state.sendq.dequeue();
394 if !send_sg.is_null() {
395 let gp = (*send_sg).g;
396 let ep = (*send_sg).elem as *mut MaybeUninit<T>; let boxed = (*send_sg).boxed_elem;
398 let val = if state.cap == 0 {
399 let v = (*ep).assume_init_read();
400 if boxed { let _ = Box::from_raw(ep); }
401 (*send_sg).elem = ptr::null_mut();
402 v
403 } else {
404 let head = state.buf.pop_front().unwrap();
405 let sv = (*ep).assume_init_read();
406 if boxed { let _ = Box::from_raw(ep); }
407 (*send_sg).elem = ptr::null_mut();
408 state.buf.push_back(sv);
409 head
410 };
411 (*(elem as *mut MaybeUninit<T>)).write(val);
412 (*send_sg).success = true;
413 (*gp).param = send_sg as *mut u8;
414 return TryResult::Handoff { gp, ok: true };
415 }
416
417 if !state.buf.is_empty() {
419 let val = state.buf.pop_front().unwrap();
420 (*(elem as *mut MaybeUninit<T>)).write(val);
421 return TryResult::Done { ok: true };
422 }
423
424 if state.closed {
426 return TryResult::Done { ok: false };
427 }
428
429 TryResult::NotReady
430}
431
432pub(crate) unsafe fn enqueue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
433 let hchan = &*(p as *const Hchan<T>);
434 (*hchan.state.get()).sendq.enqueue(sg);
435}
436
437pub(crate) unsafe fn enqueue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
438 let hchan = &*(p as *const Hchan<T>);
439 (*hchan.state.get()).recvq.enqueue(sg);
440}
441
442pub(crate) unsafe fn dequeue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
443 let hchan = &*(p as *const Hchan<T>);
444 (*hchan.state.get()).sendq.dequeue_sudog(sg);
445}
446
447pub(crate) unsafe fn dequeue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
448 let hchan = &*(p as *const Hchan<T>);
449 (*hchan.state.get()).recvq.dequeue_sudog(sg);
450}
451
452#[doc(hidden)]
464pub fn recv_case_of<T: Send + 'static>(rx: &Receiver<T>, slot: *mut MaybeUninit<T>) -> SCase {
465 SCase {
466 chan_ptr: Arc::as_ptr(rx.hchan()) as *const (),
467 sg: ptr::null_mut(),
468 elem: slot as *mut u8,
469 lock_fn: lock_chan::<T>,
470 unlock_fn: unlock_chan::<T>,
471 try_fn: try_recv_chan::<T>,
472 enqueue_fn: enqueue_recv_chan::<T>,
473 dequeue_fn: dequeue_recv_chan::<T>,
474 }
475}
476
477#[doc(hidden)]
486pub fn send_case_of<T: Send + 'static>(tx: &Sender<T>, val: *mut ManuallyDrop<T>) -> SCase {
487 SCase {
488 chan_ptr: Arc::as_ptr(tx.hchan()) as *const (),
489 sg: ptr::null_mut(),
490 elem: val as *mut u8,
491 lock_fn: lock_chan::<T>,
492 unlock_fn: unlock_chan::<T>,
493 try_fn: try_send_chan::<T>,
494 enqueue_fn: enqueue_send_chan::<T>,
495 dequeue_fn: dequeue_send_chan::<T>,
496 }
497}
498
499#[cfg(all(test, not(loom)))]
504#[allow(unused_unsafe)] mod tests {
506 use super::*;
507 use crate::chan::{chan, Hchan};
508 use crate::runtime::sudog::Sudog;
509 use crate::runtime::sched::run_impl;
510 use std::mem::MaybeUninit;
511 use std::ptr;
512 use std::sync::atomic::{AtomicI32, Ordering};
513 use std::sync::Arc;
514
515 unsafe fn lock_i32(p: *const ()) {
518 (*(p as *const Hchan<i32>)).mutex.lock();
519 }
520 unsafe fn unlock_i32(p: *const ()) {
521 unsafe { (*(p as *const Hchan<i32>)).mutex.unlock() };
522 }
523
524 unsafe fn try_send_i32(p: *const (), elem: *mut u8) -> TryResult {
528 let hchan = &*(p as *const Hchan<i32>);
529 let state = &mut *hchan.state.get();
531
532 if state.closed {
533 return TryResult::ClosedSend;
534 }
535
536 let recv_sg = state.recvq.dequeue();
538 if !recv_sg.is_null() {
539 let gp = (*recv_sg).g;
540 let ep = (*recv_sg).elem as *mut MaybeUninit<i32>;
541 if !ep.is_null() {
542 (*ep).write(ptr::read(elem as *const i32));
543 }
544 (*recv_sg).success = true;
545 (*gp).param = recv_sg as *mut u8;
546 return TryResult::Handoff { gp, ok: true };
547 }
548
549 if state.buf.len() < state.cap {
551 state.buf.push_back(ptr::read(elem as *const i32));
552 return TryResult::Done { ok: true };
553 }
554
555 TryResult::NotReady
556 }
557
558 unsafe fn try_recv_i32(p: *const (), elem: *mut u8) -> TryResult {
562 let hchan = &*(p as *const Hchan<i32>);
563 let state = &mut *hchan.state.get();
564
565 let send_sg = state.sendq.dequeue();
567 if !send_sg.is_null() {
568 let gp = (*send_sg).g;
569 let ep = (*send_sg).elem as *mut MaybeUninit<i32>;
570 let boxed = (*send_sg).boxed_elem;
571 let val = if state.cap == 0 {
572 let v = (*ep).assume_init_read();
573 if boxed { let _ = Box::from_raw(ep); }
574 (*send_sg).elem = ptr::null_mut();
575 v
576 } else {
577 let head = state.buf.pop_front().unwrap();
578 let sv = (*ep).assume_init_read();
579 if boxed { let _ = Box::from_raw(ep); }
580 (*send_sg).elem = ptr::null_mut();
581 state.buf.push_back(sv);
582 head
583 };
584 (*(elem as *mut MaybeUninit<i32>)).write(val);
585 (*send_sg).success = true;
586 (*gp).param = send_sg as *mut u8;
587 return TryResult::Handoff { gp, ok: true };
588 }
589
590 if !state.buf.is_empty() {
592 let val = state.buf.pop_front().unwrap();
593 (*(elem as *mut MaybeUninit<i32>)).write(val);
594 return TryResult::Done { ok: true };
595 }
596
597 if state.closed {
599 (*(elem as *mut MaybeUninit<i32>)) = MaybeUninit::uninit();
600 return TryResult::Done { ok: false };
601 }
602
603 TryResult::NotReady
604 }
605
606 unsafe fn enqueue_send_i32(p: *const (), sg: *mut Sudog) {
607 let hchan = &*(p as *const Hchan<i32>);
608 (*hchan.state.get()).sendq.enqueue(sg);
609 }
610 unsafe fn enqueue_recv_i32(p: *const (), sg: *mut Sudog) {
611 let hchan = &*(p as *const Hchan<i32>);
612 (*hchan.state.get()).recvq.enqueue(sg);
613 }
614 unsafe fn dequeue_send_sg_i32(p: *const (), sg: *mut Sudog) {
615 let hchan = &*(p as *const Hchan<i32>);
616 (*hchan.state.get()).sendq.dequeue_sudog(sg);
617 }
618 unsafe fn dequeue_recv_sg_i32(p: *const (), sg: *mut Sudog) {
619 let hchan = &*(p as *const Hchan<i32>);
620 (*hchan.state.get()).recvq.dequeue_sudog(sg);
621 }
622
623 fn send_case(h: &Arc<Hchan<i32>>, val: &mut i32) -> SCase {
625 SCase {
626 chan_ptr: Arc::as_ptr(h) as *const (),
627 sg: ptr::null_mut(),
628 elem: val as *mut i32 as *mut u8,
629 lock_fn: lock_i32,
630 unlock_fn: unlock_i32,
631 try_fn: try_send_i32,
632 enqueue_fn: enqueue_send_i32,
633 dequeue_fn: dequeue_send_sg_i32,
634 }
635 }
636
637 fn recv_case(h: &Arc<Hchan<i32>>, slot: &mut MaybeUninit<i32>) -> SCase {
639 SCase {
640 chan_ptr: Arc::as_ptr(h) as *const (),
641 sg: ptr::null_mut(),
642 elem: slot as *mut MaybeUninit<i32> as *mut u8,
643 lock_fn: lock_i32,
644 unlock_fn: unlock_i32,
645 try_fn: try_recv_i32,
646 enqueue_fn: enqueue_recv_i32,
647 dequeue_fn: dequeue_recv_sg_i32,
648 }
649 }
650
651 #[test]
655 fn fast_recv_buffered() {
656 run_impl(|| {
657 let (tx, rx) = chan::<i32>(4);
658 tx.send(42);
659
660 let mut slot = MaybeUninit::<i32>::uninit();
661 let mut cases = [recv_case(rx.hchan(), &mut slot)];
662 let (idx, ok) = unsafe { selectgo(&mut cases, true) };
663
664 assert_eq!(idx, 0, "should pick recv case");
665 assert!(ok, "should be ok (not closed)");
666 assert_eq!(unsafe { slot.assume_init() }, 42);
667 });
668 }
669
670 #[test]
672 fn fast_send_buffered() {
673 run_impl(|| {
674 let (tx, rx) = chan::<i32>(4);
675
676 let mut val = 99_i32;
677 let mut cases = [send_case(tx.hchan(), &mut val)];
678 let (idx, ok) = unsafe { selectgo(&mut cases, true) };
679
680 assert_eq!(idx, 0);
681 assert!(ok, "buffered send completes with ok=true");
682 assert_eq!(rx.recv(), Some(99));
683 });
684 }
685
686 #[test]
688 fn default_taken_when_not_ready() {
689 run_impl(|| {
690 let (_tx, rx) = chan::<i32>(0);
691
692 let mut slot = MaybeUninit::<i32>::uninit();
693 let mut cases = [recv_case(rx.hchan(), &mut slot)];
694 let (idx, ok) = unsafe { selectgo(&mut cases, true) };
695
696 assert_eq!(idx, CASE_DEFAULT);
697 assert!(!ok);
698 });
699 }
700
701 #[test]
703 fn recv_closed_empty() {
704 run_impl(|| {
705 let (tx, rx) = chan::<i32>(0);
706 tx.close();
707
708 let mut slot = MaybeUninit::<i32>::uninit();
709 let mut cases = [recv_case(rx.hchan(), &mut slot)];
710 let (idx, ok) = unsafe { selectgo(&mut cases, false) };
711
712 assert_eq!(idx, 0);
713 assert!(!ok, "recv from closed returns ok=false");
714 });
715 }
716
717 #[test]
721 fn multi_case_first_ready_wins() {
722 run_impl(|| {
723 let (tx1, rx1) = chan::<i32>(1);
724 let (_tx2, rx2) = chan::<i32>(1);
725
726 tx1.send(7);
727
728 let mut s1 = MaybeUninit::<i32>::uninit();
729 let mut s2 = MaybeUninit::<i32>::uninit();
730 let mut cases = [
731 recv_case(rx1.hchan(), &mut s1),
732 recv_case(rx2.hchan(), &mut s2),
733 ];
734 let (idx, ok) = unsafe { selectgo(&mut cases, false) };
735
736 assert_eq!(idx, 0);
737 assert!(ok);
738 assert_eq!(unsafe { s1.assume_init() }, 7);
739 });
740 }
741
742 #[test]
746 fn blocking_recv_unblocked_by_send() {
747 use crate::runtime::sched::spawn_goroutine;
748
749 let result = Arc::new(AtomicI32::new(-1));
750 let result2 = Arc::clone(&result);
751
752 run_impl(move || {
753 let (tx, rx) = chan::<i32>(0);
754
755 unsafe {
756 spawn_goroutine(move || {
757 crate::gosched();
759 tx.send(55);
760 });
761 }
762
763 let mut slot = MaybeUninit::<i32>::uninit();
764 let mut cases = [recv_case(rx.hchan(), &mut slot)];
765 let (idx, ok) = unsafe { selectgo(&mut cases, false) };
767
768 assert_eq!(idx, 0);
769 assert!(ok);
770 result2.store(unsafe { slot.assume_init() }, Ordering::Relaxed);
771 });
772
773 assert_eq!(result.load(Ordering::Acquire), 55);
774 }
775
776 #[test]
778 fn blocking_send_unblocked_by_recv() {
779 use crate::runtime::sched::spawn_goroutine;
780
781 run_impl(|| {
782 let (tx, rx) = chan::<i32>(0);
783
784 unsafe {
785 spawn_goroutine(move || {
786 crate::gosched();
787 let _ = rx.recv();
789 });
790 }
791
792 let mut val = 77_i32;
793 let mut cases = [send_case(tx.hchan(), &mut val)];
794 let (idx, _ok) = unsafe { selectgo(&mut cases, false) };
795
796 assert_eq!(idx, 0);
797 });
798 }
799
800 #[test]
802 fn select_race_one_winner() {
803 use crate::runtime::sched::spawn_goroutine;
804
805 let wins = Arc::new(AtomicI32::new(0));
806 let wins2 = Arc::clone(&wins);
807 let wins3 = Arc::clone(&wins);
808
809 run_impl(move || {
810 let (tx, rx) = chan::<i32>(1);
811 tx.send(1); unsafe {
814 spawn_goroutine({
815 let wins = Arc::clone(&wins2);
816 let rx = rx.clone();
817 move || {
818 let mut slot = MaybeUninit::<i32>::uninit();
819 let mut cases = [recv_case(rx.hchan(), &mut slot)];
820 let (idx, ok) = unsafe { selectgo(&mut cases, true) };
821 if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
822 }
823 });
824 }
825
826 unsafe {
827 spawn_goroutine({
828 let wins = Arc::clone(&wins3);
829 let rx = rx.clone();
830 move || {
831 let mut slot = MaybeUninit::<i32>::uninit();
832 let mut cases = [recv_case(rx.hchan(), &mut slot)];
833 let (idx, ok) = unsafe { selectgo(&mut cases, true) };
834 if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
835 }
836 });
837 }
838
839 for _ in 0..200 { crate::gosched(); }
841 });
842
843 assert_eq!(wins.load(Ordering::Acquire), 1);
845 }
846}