use std::any::Any;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64, Ordering::*};
use std::sync::{Arc, Mutex, OnceLock};
use super::g::{current_g, set_current_g, G, GDEAD, GRUNNABLE, GRUNNING, STACK_GUARD};
use super::m::{current_m, M};
use super::p::{GlobalRunQueue, P, PIDLE, PRUNNING};
use super::stack::{grow_stack_if_needed, stack_alloc};
#[cfg(not(windows))]
use super::stack::install_sigsegv_handler;
use super::sysmon::start_sysmon;
use super::time::start_timer_thread;
#[cfg(all(target_arch = "x86_64", not(windows)))]
use super::asm_amd64::{async_preempt_trampoline, gogo, mcall};
#[cfg(all(target_arch = "x86_64", windows))]
use super::asm_amd64::{gogo, mcall};
#[cfg(target_arch = "aarch64")]
use super::asm_arm64::{async_preempt_trampoline, gogo, mcall};
pub(crate) struct SchedInner {
pub idle_p: *mut P,
pub idle_m: *mut M,
pub nmidle: i32,
pub allp: Vec<*mut P>,
pub gomaxprocs: i32,
}
unsafe impl Send for SchedInner {}
pub(crate) struct Sched {
pub global_run_q: GlobalRunQueue,
pub nmspinning: AtomicI32,
pub gomaxprocs: AtomicI32,
pub inner: Mutex<SchedInner>,
}
unsafe impl Sync for Sched {}
static SCHED: OnceLock<Sched> = OnceLock::new();
pub(crate) fn sched() -> &'static Sched {
SCHED.get_or_init(|| Sched {
global_run_q: GlobalRunQueue::new(),
nmspinning: AtomicI32::new(0),
gomaxprocs: AtomicI32::new(1),
inner: Mutex::new(SchedInner {
idle_p: ptr::null_mut(),
idle_m: ptr::null_mut(),
nmidle: 0,
allp: Vec::new(),
gomaxprocs: 1,
}),
})
}
#[allow(clippy::never_loop)] pub(crate) unsafe fn schedule() -> ! {
let m = current_m();
debug_assert!(!m.is_null(), "schedule: CURRENT_M is null — call set_current_m first");
loop {
let p = unsafe { (*m).p };
debug_assert!(!p.is_null(), "schedule: M has no P attached");
let tick = unsafe { (*p).schedtick.load(Relaxed) };
if tick % 61 == 0 && sched().global_run_q.len() > 0 {
let gp = unsafe { sched().global_run_q.pop() };
if !gp.is_null() {
unsafe { execute(gp) }; }
}
let (gp, _inherit) = unsafe { (*p).runqget() };
let gp = if !gp.is_null() {
gp
} else {
unsafe { findrunnable() } };
unsafe { execute(gp) }; }
}
pub(crate) unsafe fn findrunnable() -> *mut G {
let m = current_m();
let sc = sched();
loop {
let p = unsafe { (*m).p };
if !p.is_null() {
let (gp, _) = unsafe { (*p).runqget() };
if !gp.is_null() {
return gp;
}
}
{
let gp = unsafe { sc.global_run_q.pop() };
if !gp.is_null() {
return gp;
}
}
{
let inner = sc.inner.lock().unwrap();
let allp = &inner.allp;
let np = allp.len();
if np > 1 && !p.is_null() {
let start = (unsafe { (*m).id as usize }).wrapping_mul(0x9e3779b9) % np;
let victim_ptrs: Vec<*mut P> = (0..4)
.map(|i| allp[(start.wrapping_add(i)) % np])
.collect();
drop(inner);
for (i, victim_ptr) in victim_ptrs.iter().enumerate() {
if *victim_ptr == p {
continue;
}
let stolen = unsafe {
(*p).runqsteal(&**victim_ptr, i == 3)
};
if !stolen.is_null() {
return stolen;
}
}
}
}
{
let ready = unsafe { super::netpoll::netpoll_wait(0) };
for gp in ready {
unsafe { super::park::goready(gp) };
}
}
unsafe { stopm() };
}
}
unsafe fn stopm() {
let m = current_m();
let sc = sched();
{
let mut inner = sc.inner.lock().unwrap();
let p = unsafe { (*m).p };
if !p.is_null() {
unsafe {
(*m).p = ptr::null_mut();
(*p).status.store(PIDLE, Release);
(*p).link = inner.idle_p;
inner.idle_p = p;
}
}
unsafe {
(*m).schedlink = inner.idle_m;
inner.idle_m = m;
inner.nmidle += 1;
}
}
unsafe { (*m).park_m() };
}
pub(crate) unsafe fn startm(p: *mut P) {
let sc = sched();
let mut inner = sc.inner.lock().unwrap();
let m = inner.idle_m;
if m.is_null() {
if !p.is_null() {
unsafe {
(*p).status.store(PIDLE, Release);
(*p).link = inner.idle_p;
inner.idle_p = p;
}
}
return;
}
unsafe {
inner.idle_m = (*m).schedlink;
(*m).schedlink = ptr::null_mut();
inner.nmidle -= 1;
}
let use_p = if !p.is_null() {
p
} else if !inner.idle_p.is_null() {
let p2 = inner.idle_p;
unsafe {
inner.idle_p = (*p2).link;
(*p2).link = ptr::null_mut();
}
p2
} else {
unsafe {
(*m).schedlink = inner.idle_m;
inner.idle_m = m;
inner.nmidle += 1;
}
return;
};
unsafe {
(*use_p).status.store(PRUNNING, Release);
(*m).p = use_p;
}
drop(inner);
unsafe { (*m).unpark() };
}
pub(crate) unsafe fn execute(gp: *mut G) -> ! {
let m = current_m();
unsafe {
(*m).curg = gp;
(*gp).m = m;
(*gp).atomicstatus.store(GRUNNING, Release);
}
let p = unsafe { (*m).p };
if !p.is_null() {
unsafe { (*p).schedtick.fetch_add(1, Relaxed) };
}
unsafe { grow_stack_if_needed(gp) };
unsafe {
set_current_g(gp);
gogo(gp)
}
}
pub(crate) unsafe extern "C" fn goexit0(gp: *mut G) {
let m = current_m();
unsafe {
(*gp).atomicstatus.store(GDEAD, Release);
(*gp).m = ptr::null_mut();
(*m).curg = ptr::null_mut();
set_current_g(ptr::null_mut());
}
unsafe { schedule() }
}
pub(crate) unsafe fn gosched() {
let gp = current_g();
debug_assert!(!gp.is_null(), "gosched: called from g0 or uninitialised thread");
unsafe { mcall(gp, gosched_m) };
}
unsafe extern "C" fn gosched_m(gp: *mut G) {
let m = current_m();
unsafe {
(*gp).atomicstatus.store(GRUNNABLE, Release);
(*gp).m = ptr::null_mut();
(*m).curg = ptr::null_mut();
set_current_g(ptr::null_mut());
}
unsafe {
(*gp).schedlink = ptr::null_mut();
sched().global_run_q.push_batch(gp, gp, 1);
}
unsafe { schedule() };
}
#[cfg(not(windows))]
static PREV_SIGURG: Mutex<Option<libc::sigaction>> = Mutex::new(None);
#[cfg(not(windows))]
pub(crate) unsafe fn install_sigurg_handler() {
let mut sa: libc::sigaction = unsafe { std::mem::zeroed() };
sa.sa_sigaction = sigurg_handler as *const () as usize;
sa.sa_flags = (libc::SA_SIGINFO | libc::SA_ONSTACK | libc::SA_RESTART) as _;
unsafe { libc::sigemptyset(&mut sa.sa_mask) };
let mut old: libc::sigaction = unsafe { std::mem::zeroed() };
let ret = unsafe { libc::sigaction(libc::SIGURG, &sa, &mut old) };
assert_eq!(ret, 0, "install_sigurg_handler: sigaction failed");
*PREV_SIGURG.lock().unwrap() = Some(old);
}
#[cfg(not(windows))]
unsafe extern "C" fn sigurg_handler(
sig: libc::c_int,
info: *mut libc::siginfo_t,
ctx: *mut libc::c_void,
) {
let gp = current_g();
if !gp.is_null() && unsafe { (*gp).preempt } {
unsafe { redirect_to_async_preempt(gp, ctx) };
return;
}
let prev = *PREV_SIGURG.lock().unwrap();
match prev {
Some(old) if old.sa_sigaction != libc::SIG_DFL
&& old.sa_sigaction != libc::SIG_IGN => {
type SaFn = unsafe extern "C" fn(libc::c_int, *mut libc::siginfo_t, *mut libc::c_void);
let f: SaFn = unsafe { std::mem::transmute(old.sa_sigaction) };
unsafe { f(sig, info, ctx) };
}
_ => {} }
}
#[cfg(not(windows))]
#[allow(unused_variables)]
unsafe fn redirect_to_async_preempt(gp: *mut G, ctx: *mut libc::c_void) {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let rip = (*uc).uc_mcontext.gregs[libc::REG_RIP as usize] as usize;
let rsp = (*uc).uc_mcontext.gregs[libc::REG_RSP as usize] as usize;
let new_rsp = rsp - 8;
*(new_rsp as *mut usize) = rip;
(*uc).uc_mcontext.gregs[libc::REG_RSP as usize] = new_rsp as libc::greg_t;
(*uc).uc_mcontext.gregs[libc::REG_RIP as usize] =
async_preempt_trampoline as usize as libc::greg_t;
}
#[cfg(all(target_os = "linux", target_arch = "aarch64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
(*uc).uc_mcontext.regs[30] = (*uc).uc_mcontext.pc;
(*uc).uc_mcontext.pc = async_preempt_trampoline as u64;
}
#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let ss = &mut (*(*uc).uc_mcontext).__ss;
let rip = ss.__rip as usize;
let rsp = ss.__rsp as usize;
let new_rsp = rsp - 8;
*(new_rsp as *mut usize) = rip;
ss.__rsp = new_rsp as u64;
ss.__rip = async_preempt_trampoline as *const () as u64;
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let ss = &mut (*(*uc).uc_mcontext).__ss;
ss.__lr = ss.__pc;
ss.__pc = async_preempt_trampoline as u64;
}
}
#[unsafe(no_mangle)]
pub(crate) unsafe extern "C" fn async_preempt2() {
let gp = current_g();
if gp.is_null() {
return;
}
unsafe {
(*gp).preempt = false;
(*gp).stackguard0 = (*gp).stack.lo + STACK_GUARD;
}
unsafe { mcall(gp, preemptm) };
}
unsafe extern "C" fn preemptm(gp: *mut G) {
let m = current_m();
unsafe {
(*gp).atomicstatus.store(GRUNNABLE, Release);
(*gp).m = ptr::null_mut();
(*m).curg = ptr::null_mut();
set_current_g(ptr::null_mut());
(*gp).schedlink = ptr::null_mut();
sched().global_run_q.push_batch(gp, gp, 1);
}
unsafe { schedule() };
}
struct GoFn(Box<dyn FnOnce() + Send + 'static>);
type PanicFn = Arc<dyn Fn(Box<dyn Any + Send + 'static>) + Send + Sync + 'static>;
static PANIC_HANDLER: Mutex<Option<PanicFn>> = Mutex::new(None);
pub fn set_panic_handler<F>(f: F)
where
F: Fn(Box<dyn Any + Send + 'static>) + Send + Sync + 'static,
{
*PANIC_HANDLER.lock().unwrap() = Some(Arc::new(f));
}
fn handle_goroutine_panic(payload: Box<dyn Any + Send + 'static>) {
let handler = PANIC_HANDLER.lock().unwrap_or_else(|e| e.into_inner()).clone();
match handler {
Some(f) => f(payload),
None => {
let msg = payload.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| payload.downcast_ref::<&str>().copied())
.unwrap_or("(unknown panic payload)");
eprintln!("goroutine panicked: {msg}");
}
}
}
pub fn gomaxprocs() -> usize {
sched().gomaxprocs.load(Relaxed) as usize
}
pub fn set_gomaxprocs(n: usize) -> usize {
let n = n.clamp(1, 256) as i32;
let sc = sched();
let old = {
let mut inner = sc.inner.lock().unwrap();
let old = inner.gomaxprocs;
if n > old {
let new_ps: Vec<*mut P> = (old..n)
.map(|id| Box::into_raw(P::new(id)))
.collect();
inner.allp.extend_from_slice(&new_ps);
inner.gomaxprocs = n;
sc.gomaxprocs.store(n, Relaxed);
drop(inner);
for p_ptr in new_ps {
let id = NEXT_MID.fetch_add(1, Relaxed);
unsafe { spawn_m(id, p_ptr) };
}
} else {
inner.gomaxprocs = n;
sc.gomaxprocs.store(n, Relaxed);
}
old
};
old as usize
}
static NEXT_GOID: AtomicU64 = AtomicU64::new(1);
static NEXT_MID: AtomicI64 = AtomicI64::new(1);
static INITIALIZED: AtomicBool = AtomicBool::new(false);
unsafe extern "C" fn goroutine_entry() {
let gp = current_g();
let go_fn = unsafe {
let fn_ptr = (*gp).sched.ctxt as *mut GoFn;
(*gp).sched.ctxt = ptr::null_mut();
Box::from_raw(fn_ptr)
};
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| (go_fn.0)()));
if let Err(payload) = result {
handle_goroutine_panic(payload);
}
}
#[cfg(target_arch = "x86_64")]
#[unsafe(naked)]
unsafe extern "C" fn goexit_trampoline() -> ! {
core::arch::naked_asm!(
"call {handler}",
"ud2", handler = sym goexit0_handler,
)
}
#[cfg(target_arch = "x86_64")]
unsafe extern "C" fn goexit0_handler() -> ! {
let gp = current_g();
unsafe { mcall(gp, goexit0) };
unsafe { std::hint::unreachable_unchecked() }
}
#[cfg(target_arch = "aarch64")]
unsafe extern "C" fn goexit_trampoline() -> ! {
let gp = current_g();
unsafe { mcall(gp, goexit0) };
unsafe { std::hint::unreachable_unchecked() }
}
pub(crate) fn new_goroutine(f: impl FnOnce() + Send + 'static) -> Box<G> {
let stack = unsafe { stack_alloc().expect("new_goroutine: stack_alloc failed") };
let goid = NEXT_GOID.fetch_add(1, Relaxed);
let mut g = G::new(stack, goid);
let go_fn = Box::new(GoFn(Box::new(f)));
g.sched.ctxt = Box::into_raw(go_fn) as *mut u8;
g.sched.pc = goroutine_entry as *const () as usize;
#[cfg(target_arch = "x86_64")]
{
let ret_slot = (g.stack.hi - 8) as *mut usize;
unsafe { ret_slot.write(goexit_trampoline as *const () as usize) };
g.sched.sp = g.stack.hi - 8;
g.sched.bp = 0; }
#[cfg(target_arch = "aarch64")]
{
g.sched.lr = goexit_trampoline as usize;
g.sched.sp = g.stack.hi;
g.sched.bp = 0;
}
g.atomicstatus.store(GRUNNABLE, Release);
g
}
pub(crate) unsafe fn spawn_goroutine(f: impl FnOnce() + Send + 'static) {
let g_ptr = Box::into_raw(new_goroutine(f));
unsafe {
(*g_ptr).schedlink = ptr::null_mut();
sched().global_run_q.push_batch(g_ptr, g_ptr, 1);
startm(ptr::null_mut());
}
}
pub(crate) fn schedinit(nprocs: i32) {
if INITIALIZED.swap(true, AcqRel) {
return;
}
assert!(nprocs >= 1, "schedinit: nprocs must be ≥ 1");
let nprocs = std::env::var("GOMAXPROCS")
.ok()
.and_then(|s| s.parse::<i32>().ok())
.filter(|&n| (1..=256).contains(&n))
.unwrap_or(nprocs);
let sc = sched();
let ps: Vec<*mut P> = (0..nprocs)
.map(|id| Box::into_raw(P::new(id)))
.collect();
{
let mut inner = sc.inner.lock().unwrap();
inner.gomaxprocs = nprocs;
inner.allp = ps.clone();
}
sc.gomaxprocs.store(nprocs, Relaxed);
for p_ptr in ps {
let id = NEXT_MID.fetch_add(1, Relaxed);
unsafe { spawn_m(id, p_ptr) };
}
#[cfg(not(windows))]
unsafe { install_sigsegv_handler() };
#[cfg(not(windows))]
unsafe { install_sigurg_handler() };
start_sysmon();
start_timer_thread();
}
unsafe fn spawn_m(id: i64, p: *mut P) {
let m = Box::into_raw(unsafe { M::new(id) });
unsafe {
(*m).p = p;
(*p).m = m;
(*p).status.store(PRUNNING, Release);
}
let m_addr = m as usize;
std::thread::spawn(move || {
let m = m_addr as *mut M;
unsafe {
(*m).start();
schedule();
}
});
}
pub(crate) fn run_impl<F: FnOnce() + Send + 'static>(f: F) {
let nprocs = std::thread::available_parallelism()
.map(|n| n.get() as i32)
.unwrap_or(1);
schedinit(nprocs);
let caller = std::thread::current();
let wrapper = move || {
f();
caller.unpark();
};
unsafe { spawn_goroutine(wrapper) };
std::thread::park();
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
#[test]
fn sched_singleton() {
let s1 = sched() as *const Sched;
let s2 = sched() as *const Sched;
assert_eq!(s1, s2, "sched() must return the same singleton");
}
#[test]
fn global_run_queue_round_trip() {
use crate::runtime::g::{Stack, G};
use crate::runtime::p::GlobalRunQueue;
let q = GlobalRunQueue::new();
let lo = 0x200000usize;
let g1 = G::new(Stack { lo, hi: lo + 65536 }, 99);
let g1_ptr = Box::into_raw(g1);
unsafe {
(*g1_ptr).schedlink = ptr::null_mut();
q.push_batch(g1_ptr, g1_ptr, 1);
assert_eq!(q.len(), 1);
let got = q.pop();
assert_eq!(got, g1_ptr);
assert_eq!(q.len(), 0);
let _ = Box::from_raw(g1_ptr);
}
}
#[test]
fn new_goroutine_fields() {
use crate::runtime::g::GRUNNABLE;
use std::sync::atomic::Ordering::Relaxed;
let g = new_goroutine(|| {});
let g_ptr = Box::into_raw(g);
unsafe {
assert_eq!(
(*g_ptr).atomicstatus.load(Relaxed),
GRUNNABLE,
"new goroutine must start as Grunnable"
);
assert_ne!((*g_ptr).sched.pc, 0, "pc must be set to goroutine_entry");
assert!(!(*g_ptr).sched.ctxt.is_null(), "ctxt must hold the closure");
#[cfg(target_arch = "x86_64")]
{
assert_eq!((*g_ptr).sched.sp, (*g_ptr).stack.hi - 8);
let ret_addr = ((*g_ptr).sched.sp as *const usize).read();
assert_eq!(ret_addr, goexit_trampoline as *const () as usize);
}
#[cfg(target_arch = "aarch64")]
{
assert_eq!((*g_ptr).sched.sp, (*g_ptr).stack.hi);
assert_eq!((*g_ptr).sched.lr, goexit_trampoline as *const () as usize);
}
let _ = Box::from_raw(g_ptr);
}
}
#[test]
fn run_single_goroutine() {
use std::sync::atomic::{AtomicBool, Ordering};
static RAN: AtomicBool = AtomicBool::new(false);
run_impl(|| {
RAN.store(true, Ordering::Release);
});
assert!(RAN.load(Ordering::Acquire), "goroutine body did not execute");
}
#[test]
fn run_second_goroutine() {
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNT: AtomicUsize = AtomicUsize::new(0);
run_impl(|| { COUNT.fetch_add(1, Ordering::AcqRel); });
run_impl(|| { COUNT.fetch_add(1, Ordering::AcqRel); });
assert_eq!(COUNT.load(Ordering::Acquire), 2);
}
#[test]
fn gosched_returns_to_caller() {
use std::sync::atomic::{AtomicBool, Ordering};
static AFTER_YIELD: AtomicBool = AtomicBool::new(false);
run_impl(|| {
unsafe { gosched() };
AFTER_YIELD.store(true, Ordering::Release);
});
assert!(
AFTER_YIELD.load(Ordering::Acquire),
"execution must continue after gosched()"
);
}
#[test]
fn gosched_allows_other_goroutines_to_run() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let flag = Arc::new(AtomicBool::new(false));
let flag_setter = Arc::clone(&flag);
run_impl(move || {
unsafe {
spawn_goroutine(move || {
flag_setter.store(true, Ordering::Release);
});
}
while !flag.load(Ordering::Acquire) {
unsafe { gosched() };
}
});
}
}