use std::any::Any;
use std::cell::Cell;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64, AtomicU8, Ordering::*};
use std::sync::{Arc, Mutex, OnceLock};
use super::g::{casgstatus, current_g, readgstatus, set_current_g, Stack, G, GDEAD, GPREEMPTED, GRUNNABLE, GRUNNING, STACK_GUARD};
use super::m::{current_m, M};
use super::p::{GlobalRunQueue, P, PIDLE, PRUNNING};
use super::stack::{grow_stack_if_needed, stack_alloc, stack_pool_free, GOROUTINE_STACK_BYTES};
#[cfg(not(windows))]
use super::stack::{install_sigsegv_handler, try_grow_stack_from_signal};
use super::sysmon::start_sysmon;
use super::time::start_timer_thread;
#[cfg(target_arch = "x86_64")]
use super::asm_amd64::{async_preempt_trampoline, gogo, mcall};
#[cfg(target_arch = "aarch64")]
use super::asm_arm64::{async_preempt_trampoline, gogo, mcall};
pub(crate) struct SchedInner {
pub idle_p: *mut P,
pub idle_m: *mut M,
pub nmidle: i32,
pub allp: Vec<*mut P>,
pub gomaxprocs: i32,
}
unsafe impl Send for SchedInner {}
pub(crate) struct Rt {
pub global_run_q: GlobalRunQueue,
pub nmspinning: AtomicI32,
pub gomaxprocs: AtomicI32,
pub inner: Mutex<SchedInner>,
pub allg: Mutex<Vec<*mut G>>,
pub shutdown: AtomicBool,
}
unsafe impl Sync for Rt {}
unsafe impl Send for Rt {}
thread_local! {
static CURRENT_RT: Cell<*const Rt> = const { Cell::new(ptr::null()) };
}
static GLOBAL_RT: OnceLock<&'static Rt> = OnceLock::new();
#[inline]
pub(crate) fn global_rt_ptr() -> *const Rt {
GLOBAL_RT.get().map_or(ptr::null(), |rt| *rt as *const Rt)
}
#[inline]
pub(crate) fn current_rt() -> &'static Rt {
let p = CURRENT_RT.with(|c| c.get());
debug_assert!(!p.is_null(), "current_rt: CURRENT_RT is not set on this thread");
unsafe { &*p }
}
#[inline]
pub(crate) fn current_rt_ptr() -> *const Rt {
CURRENT_RT.with(|c| c.get())
}
#[inline]
pub(crate) fn current_p() -> *mut P {
let m = super::m::current_m();
if m.is_null() {
ptr::null_mut()
} else {
unsafe { (*m).p }
}
}
#[inline]
pub(crate) fn set_current_rt(rt: *const Rt) {
CURRENT_RT.with(|c| c.set(rt));
}
#[inline]
pub(crate) fn sched() -> &'static Rt {
current_rt()
}
pub(crate) unsafe fn schedule() {
let m = current_m();
debug_assert!(!m.is_null(), "schedule: CURRENT_M is null — call set_current_m first");
let p = unsafe { (*m).p };
debug_assert!(!p.is_null(), "schedule: M has no P attached");
let tick = unsafe { (*p).schedtick.load(Relaxed) };
if tick % 61 == 0 && sched().global_run_q.len() > 0 {
let gp = unsafe { sched().global_run_q.pop() };
if !gp.is_null() {
unsafe { execute(gp) }; }
}
let (gp, _inherit) = unsafe { (*p).runqget() };
let gp = if !gp.is_null() {
gp
} else {
match unsafe { findrunnable() } {
Some(gp) => gp,
None => return, }
};
unsafe { execute(gp) }; }
pub(crate) unsafe fn findrunnable() -> Option<*mut G> {
let m = current_m();
let sc = sched();
loop {
let p = unsafe { (*m).p };
if !p.is_null() {
let (gp, _) = unsafe { (*p).runqget() };
if !gp.is_null() {
return Some(gp);
}
}
{
let gp = unsafe { sc.global_run_q.pop() };
if !gp.is_null() {
return Some(gp);
}
}
{
let inner = sc.inner.lock().unwrap();
let np = inner.allp.len();
if np > 1 && !p.is_null() {
let start = (unsafe { (*m).id as usize }).wrapping_mul(0x9e3779b9) % np;
let victim_ptrs: Vec<*mut P> = (0..np)
.map(|i| inner.allp[(start.wrapping_add(i)) % np])
.collect();
drop(inner);
for pass in 0..4 {
let steal_run_next = pass == 3;
for &victim_ptr in &victim_ptrs {
if victim_ptr == p || victim_ptr.is_null() {
continue;
}
let stolen = unsafe {
(*p).runqsteal(&*victim_ptr, steal_run_next)
};
if !stolen.is_null() {
return Some(stolen);
}
}
}
}
}
{
let ready = unsafe { super::netpoll::netpoll_wait(0) };
for gp in ready {
unsafe { super::park::goready(gp) };
}
}
unsafe { stopm() };
if sc.shutdown.load(Acquire) {
return None;
}
}
}
unsafe fn stopm() {
let m = current_m();
let sc = sched();
{
let mut inner = sc.inner.lock().unwrap();
let surrendered_p = unsafe { (*m).p };
if !surrendered_p.is_null() && unsafe { (*surrendered_p).runq_size() } > 0 {
return; }
if sc.shutdown.load(Acquire) {
return;
}
if !surrendered_p.is_null() {
unsafe {
(*m).p = ptr::null_mut();
(*surrendered_p).status.store(PIDLE, Release);
(*surrendered_p).link = inner.idle_p;
inner.idle_p = surrendered_p;
}
}
unsafe {
(*m).schedlink = inner.idle_m;
inner.idle_m = m;
inner.nmidle += 1;
}
let global_work = sc.global_run_q.len() > 0;
let local_work = inner.allp.iter().any(|&p| {
!p.is_null() && unsafe { (*p).runq_size() } > 0
});
if global_work || local_work {
let mut p2 = ptr::null_mut();
if !inner.idle_p.is_null() {
p2 = inner.idle_p;
unsafe {
inner.idle_p = (*p2).link;
(*p2).link = ptr::null_mut();
}
}
if !p2.is_null() || global_work {
unsafe {
inner.idle_m = (*m).schedlink;
(*m).schedlink = ptr::null_mut();
inner.nmidle -= 1;
}
if !p2.is_null() {
unsafe {
(*p2).status.store(PRUNNING, Release);
(*m).p = p2;
}
}
return;
}
}
if sc.shutdown.load(Acquire) {
unsafe {
inner.idle_m = (*m).schedlink;
(*m).schedlink = ptr::null_mut();
inner.nmidle -= 1;
}
return;
}
}
unsafe { (*m).park_m() };
}
pub(crate) unsafe fn startm(p: *mut P) {
let sc = sched();
let mut inner = sc.inner.lock().unwrap();
let m = inner.idle_m;
if m.is_null() {
if !p.is_null() {
unsafe {
(*p).status.store(PIDLE, Release);
(*p).link = inner.idle_p;
inner.idle_p = p;
}
}
return;
}
unsafe {
inner.idle_m = (*m).schedlink;
(*m).schedlink = ptr::null_mut();
inner.nmidle -= 1;
}
let use_p = if !p.is_null() {
p
} else if !inner.idle_p.is_null() {
let p2 = inner.idle_p;
unsafe {
inner.idle_p = (*p2).link;
(*p2).link = ptr::null_mut();
}
p2
} else {
unsafe {
(*m).schedlink = inner.idle_m;
inner.idle_m = m;
inner.nmidle += 1;
}
return;
};
unsafe {
(*use_p).status.store(PRUNNING, Release);
(*m).p = use_p;
}
drop(inner);
unsafe { (*m).unpark() };
}
pub(crate) unsafe fn execute(gp: *mut G) -> ! {
let m = current_m();
unsafe {
(*m).curg = gp;
(*gp).m = m;
casgstatus(gp, GRUNNABLE, GRUNNING);
}
let p = unsafe { (*m).p };
if !p.is_null() {
unsafe { (*p).schedtick.fetch_add(1, Relaxed) };
}
#[cfg(debug_assertions)]
unsafe {
let sp = (*gp).sched.sp;
let bp = (*gp).sched.bp;
let pc = (*gp).sched.pc;
let (lo, hi) = ((*gp).stack.lo, (*gp).stack.hi);
let sp_ok = sp >= lo && sp <= hi && sp & 7 == 0;
let bp_ok = bp == 0 || (bp >= lo && bp <= hi);
if !sp_ok || !bp_ok || pc == 0 {
let status = (*gp).atomicstatus.load(std::sync::atomic::Ordering::Relaxed);
eprintln!(
"execute: corrupt gobuf: gp={gp:p} goid={} status={status} sp={sp:#x} \
bp={bp:#x} pc={pc:#x} stack=[{lo:#x},{hi:#x}]",
(*gp).goid,
);
std::process::abort();
}
}
unsafe { grow_stack_if_needed(gp) };
unsafe {
set_current_g(gp);
gogo(gp)
}
}
pub(crate) unsafe extern "C" fn goexit0(gp: *mut G) {
#[cfg(not(windows))]
unsafe { super::m::unblock_sigurg() };
let m = current_m();
unsafe {
casgstatus(gp, GRUNNING, GDEAD);
(*gp).m = ptr::null_mut();
(*m).curg = ptr::null_mut();
set_current_g(ptr::null_mut());
}
unsafe { (*m).locks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed) };
{
let mut allg = sched().allg.lock().unwrap();
if let Some(pos) = allg.iter().position(|&p| p == gp) {
allg.swap_remove(pos);
}
}
unsafe { gfree_put(gp) };
unsafe { schedule() };
}
pub(crate) unsafe fn gosched() {
#[cfg(not(windows))]
unsafe { super::m::block_sigurg() };
let gp = current_g();
debug_assert!(!gp.is_null(), "gosched: called from g0 or uninitialised thread");
unsafe { mcall(gp, gosched_m) };
}
unsafe extern "C" fn gosched_m(gp: *mut G) {
#[cfg(not(windows))]
unsafe { super::m::unblock_sigurg() };
let m = current_m();
unsafe {
casgstatus(gp, GRUNNING, GRUNNABLE);
(*gp).m = ptr::null_mut();
(*m).curg = ptr::null_mut();
set_current_g(ptr::null_mut());
}
unsafe {
(*gp).schedlink = ptr::null_mut();
sched().global_run_q.push_batch(gp, gp, 1);
}
unsafe { schedule() };
}
#[cfg(not(windows))]
static PREV_SIGURG: Mutex<Option<libc::sigaction>> = Mutex::new(None);
#[cfg(any(not(windows), target_arch = "x86_64"))]
const ASYNC_PREEMPT_HEADROOM: usize = 4096;
#[cfg(not(windows))]
pub(crate) unsafe fn install_sigurg_handler() {
let mut sa: libc::sigaction = unsafe { std::mem::zeroed() };
sa.sa_sigaction = sigurg_handler as *const () as usize;
sa.sa_flags = (libc::SA_SIGINFO | libc::SA_ONSTACK | libc::SA_RESTART) as _;
unsafe { libc::sigemptyset(&mut sa.sa_mask) };
let mut old: libc::sigaction = unsafe { std::mem::zeroed() };
let ret = unsafe { libc::sigaction(libc::SIGURG, &sa, &mut old) };
assert_eq!(ret, 0, "install_sigurg_handler: sigaction failed");
*PREV_SIGURG.lock().unwrap() = Some(old);
OUR_TEXT_ANCHOR.store(
goroutine_entry as *const () as usize,
std::sync::atomic::Ordering::Relaxed,
);
}
#[cfg(not(windows))]
#[inline]
fn ucontext_sp(ctx: *mut libc::c_void) -> usize {
let uc = ctx as *mut libc::ucontext_t;
unsafe {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
return (*uc).uc_mcontext.gregs[libc::REG_RSP as usize] as usize;
#[cfg(all(target_os = "linux", target_arch = "aarch64"))]
return (*uc).uc_mcontext.sp as usize;
#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
return (*(*uc).uc_mcontext).__ss.__rsp as usize;
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
return (*(*uc).uc_mcontext).__ss.__sp as usize;
#[allow(unreachable_code)]
0_usize
}
}
#[cfg(not(windows))]
#[inline]
fn ucontext_pc(ctx: *mut libc::c_void) -> usize {
let uc = ctx as *mut libc::ucontext_t;
unsafe {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
return (*uc).uc_mcontext.gregs[libc::REG_RIP as usize] as usize;
#[cfg(all(target_os = "linux", target_arch = "aarch64"))]
return (*uc).uc_mcontext.pc as usize;
#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
return (*(*uc).uc_mcontext).__ss.__rip as usize;
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
return (*(*uc).uc_mcontext).__ss.__pc as usize;
#[allow(unreachable_code)]
0_usize
}
}
#[cfg(any(not(windows), target_arch = "x86_64"))]
const TEXT_RANGE_BYTES: usize = 256 * 1024 * 1024;
#[cfg(any(not(windows), target_arch = "x86_64"))]
static OUR_TEXT_ANCHOR: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[cfg(any(not(windows), target_arch = "x86_64"))]
#[inline]
fn pc_in_our_text(pc: usize) -> bool {
let anchor = OUR_TEXT_ANCHOR.load(std::sync::atomic::Ordering::Relaxed);
if anchor == 0 {
return true; }
let diff = pc.abs_diff(anchor);
diff < TEXT_RANGE_BYTES
}
#[cfg(any(not(windows), target_arch = "x86_64"))]
#[inline]
fn pc_in_scheduler_asm(pc: usize) -> bool {
const ASM_FN_MAX_BYTES: usize = 4096;
#[inline]
fn near(pc: usize, fn_addr: usize) -> bool {
let diff = pc.abs_diff(fn_addr);
diff < ASM_FN_MAX_BYTES
}
#[cfg(target_arch = "x86_64")]
let bases: [usize; 10] = [
super::asm_amd64::async_preempt_trampoline as *const () as usize,
super::asm_amd64::gogo as *const () as usize,
super::asm_amd64::mcall as *const () as usize,
super::asm_amd64::gogo_asm as *const () as usize,
super::asm_amd64::mcall_asm as *const () as usize,
goexit_trampoline as *const () as usize,
goexit0_handler as *const () as usize,
super::m::current_m as *const () as usize,
async_preempt2 as *const () as usize,
goroutine_entry as *const () as usize,
];
#[cfg(target_arch = "aarch64")]
let bases: [usize; 9] = [
super::asm_arm64::async_preempt_trampoline as *const () as usize,
super::asm_arm64::gogo as *const () as usize,
super::asm_arm64::mcall as *const () as usize,
super::asm_arm64::gogo_asm as *const () as usize,
super::asm_arm64::mcall_asm as *const () as usize,
goexit_trampoline as *const () as usize,
super::m::current_m as *const () as usize,
async_preempt2 as *const () as usize,
goroutine_entry as *const () as usize,
];
for b in bases {
if near(pc, b) { return true; }
}
false
}
#[cfg(not(windows))]
unsafe extern "C" fn sigurg_handler(
sig: libc::c_int,
info: *mut libc::siginfo_t,
ctx: *mut libc::c_void,
) {
let gp = current_g();
if !gp.is_null() && unsafe { (*gp).preempt } {
let mp = super::m::current_m();
if !mp.is_null()
&& unsafe { (*mp).locks.load(std::sync::atomic::Ordering::Relaxed) } > 0
{
return; }
let pc = ucontext_pc(ctx);
if !pc_in_our_text(pc) {
return; }
if unsafe { readgstatus(gp) } != GRUNNING {
return; }
let sp = ucontext_sp(ctx);
let (lo, hi) = unsafe { ((*gp).stack.lo, (*gp).stack.hi) };
if sp < lo || sp > hi {
return; }
if sp < lo + ASYNC_PREEMPT_HEADROOM {
return; }
if sp & 7 != 0 {
return; }
if pc_in_scheduler_asm(pc) {
return; }
unsafe { redirect_to_async_preempt(gp, ctx) };
return;
}
let prev = *PREV_SIGURG.lock().unwrap();
match prev {
Some(old) if old.sa_sigaction != libc::SIG_DFL
&& old.sa_sigaction != libc::SIG_IGN => {
type SaFn = unsafe extern "C" fn(libc::c_int, *mut libc::siginfo_t, *mut libc::c_void);
let f: SaFn = unsafe { std::mem::transmute(old.sa_sigaction) };
unsafe { f(sig, info, ctx) };
}
_ => {} }
}
#[cfg(not(windows))]
#[allow(unused_variables)]
unsafe fn redirect_to_async_preempt(gp: *mut G, ctx: *mut libc::c_void) {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let rip = (*uc).uc_mcontext.gregs[libc::REG_RIP as usize] as usize;
let rsp = (*uc).uc_mcontext.gregs[libc::REG_RSP as usize] as usize;
let new_rsp = rsp - 128 - 8;
*(new_rsp as *mut usize) = rip;
(*uc).uc_mcontext.gregs[libc::REG_RSP as usize] = new_rsp as libc::greg_t;
(*uc).uc_mcontext.gregs[libc::REG_RIP as usize] =
async_preempt_trampoline as *const () as usize as libc::greg_t;
}
#[cfg(all(target_os = "linux", target_arch = "aarch64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let sp = ((*uc).uc_mcontext.sp - 16) as *mut u64; *sp = (*uc).uc_mcontext.regs[30];
(*uc).uc_mcontext.sp = sp as u64;
(*uc).uc_mcontext.regs[30] = (*uc).uc_mcontext.pc;
(*uc).uc_mcontext.pc = async_preempt_trampoline as u64;
}
#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let ss = &mut (*(*uc).uc_mcontext).__ss;
let rip = ss.__rip as usize;
let rsp = ss.__rsp as usize;
let new_rsp = rsp - 128 - 8;
*(new_rsp as *mut usize) = rip;
ss.__rsp = new_rsp as u64;
ss.__rip = async_preempt_trampoline as *const () as u64;
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let ss = &mut (*(*uc).uc_mcontext).__ss;
let sp = (ss.__sp - 16) as *mut u64; *sp = ss.__lr;
ss.__sp = sp as u64;
ss.__lr = ss.__pc;
ss.__pc = async_preempt_trampoline as u64;
}
}
#[unsafe(no_mangle)]
pub(crate) unsafe extern "C" fn async_preempt2() {
#[cfg(not(windows))]
unsafe { super::m::block_sigurg() };
let gp = current_g();
if gp.is_null() {
#[cfg(not(windows))]
unsafe { super::m::unblock_sigurg() };
return;
}
if unsafe { readgstatus(gp) } != GRUNNING {
#[cfg(not(windows))]
unsafe { super::m::unblock_sigurg() };
return;
}
unsafe {
(*gp).preempt = false;
(*gp).stackguard0 = (*gp).stack.lo + STACK_GUARD;
}
unsafe { mcall(gp, preemptm) };
}
unsafe extern "C" fn preemptm(gp: *mut G) {
#[cfg(not(windows))]
unsafe { super::m::unblock_sigurg() };
let m = current_m();
unsafe {
casgstatus(gp, GRUNNING, GPREEMPTED);
casgstatus(gp, GPREEMPTED, GRUNNABLE);
(*gp).m = ptr::null_mut();
(*m).curg = ptr::null_mut();
set_current_g(ptr::null_mut());
(*gp).schedlink = ptr::null_mut();
sched().global_run_q.push_batch(gp, gp, 1);
}
unsafe { schedule() };
}
#[cfg(not(windows))]
static PREV_SIGBUS: Mutex<Option<libc::sigaction>> = Mutex::new(None);
#[cfg(not(windows))]
pub(crate) unsafe fn install_sigbus_handler() {
let mut sa: libc::sigaction = unsafe { std::mem::zeroed() };
sa.sa_sigaction = sigbus_handler as *const () as usize;
sa.sa_flags = (libc::SA_SIGINFO | libc::SA_ONSTACK) as _;
unsafe { libc::sigemptyset(&mut sa.sa_mask) };
let mut old: libc::sigaction = unsafe { std::mem::zeroed() };
let ret = unsafe { libc::sigaction(libc::SIGBUS, &sa, &mut old) };
assert_eq!(ret, 0, "install_sigbus_handler: sigaction failed");
*PREV_SIGBUS.lock().unwrap() = Some(old);
}
#[cfg(not(windows))]
unsafe extern "C" fn sigbus_handler(
_sig: libc::c_int,
info: *mut libc::siginfo_t,
ctx: *mut libc::c_void,
) {
if !info.is_null() {
let fault_addr = unsafe { (*info).si_addr() } as usize;
if unsafe { try_grow_stack_from_signal(fault_addr, ctx) } {
return; }
}
#[inline(always)]
unsafe fn sig_write(msg: &[u8]) {
unsafe { libc::write(2, msg.as_ptr() as *const libc::c_void, msg.len()) };
}
#[inline(always)]
unsafe fn sig_hex(label: &[u8], val: u64) {
unsafe { sig_write(label) };
const H: &[u8] = b"0123456789abcdef";
let mut buf = [b'0'; 19]; buf[0] = b'0'; buf[1] = b'x';
for i in 0..16usize { buf[17 - i] = H[((val >> (i * 4)) & 0xf) as usize]; }
buf[18] = b'\n';
unsafe { sig_write(&buf) };
}
if !info.is_null() {
let fault_addr = unsafe { (*info).si_addr() } as u64;
unsafe { sig_hex(b"[go-lib SIGBUS] fault_addr = ", fault_addr) };
}
let gp = super::g::current_g();
if !gp.is_null() {
let lo = unsafe { (*gp).stack.lo } as u64;
let hi = unsafe { (*gp).stack.hi } as u64;
unsafe { sig_hex(b"[go-lib SIGBUS] stack.lo = ", lo) };
unsafe { sig_hex(b"[go-lib SIGBUS] stack.hi = ", hi) };
}
unsafe { sig_write(b"[go-lib SIGBUS] crash (non-stack fault)\n") };
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
if !ctx.is_null() {
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let ss = &(*(*uc).uc_mcontext).__ss;
sig_hex(b"[go-lib SIGBUS] PC = ", ss.__pc);
sig_hex(b"[go-lib SIGBUS] LR = ", ss.__lr);
sig_hex(b"[go-lib SIGBUS] SP = ", ss.__sp);
sig_hex(b"[go-lib SIGBUS] FP = ", ss.__fp);
}
}
#[cfg(all(target_os = "linux", target_arch = "aarch64"))]
if !ctx.is_null() {
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let mc = &(*uc).uc_mcontext;
sig_hex(b"[go-lib SIGBUS] PC = ", mc.pc);
sig_hex(b"[go-lib SIGBUS] SP = ", mc.sp);
sig_hex(b"[go-lib SIGBUS] LR = ", mc.regs[30]);
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
if !ctx.is_null() {
unsafe {
let uc = ctx as *mut libc::ucontext_t;
sig_hex(b"[go-lib SIGBUS] RIP = ", (*uc).uc_mcontext.gregs[libc::REG_RIP as usize] as u64);
sig_hex(b"[go-lib SIGBUS] RSP = ", (*uc).uc_mcontext.gregs[libc::REG_RSP as usize] as u64);
}
}
#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
if !ctx.is_null() {
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let ss = &(*(*uc).uc_mcontext).__ss;
sig_hex(b"[go-lib SIGBUS] RIP = ", ss.__rip);
sig_hex(b"[go-lib SIGBUS] RSP = ", ss.__rsp);
sig_hex(b"[go-lib SIGBUS] RAX = ", ss.__rax);
sig_hex(b"[go-lib SIGBUS] RBX = ", ss.__rbx);
sig_hex(b"[go-lib SIGBUS] RCX = ", ss.__rcx);
sig_hex(b"[go-lib SIGBUS] RDX = ", ss.__rdx);
sig_hex(b"[go-lib SIGBUS] RSI = ", ss.__rsi);
sig_hex(b"[go-lib SIGBUS] RDI = ", ss.__rdi);
sig_hex(b"[go-lib SIGBUS] RBP = ", ss.__rbp);
sig_hex(b"[go-lib SIGBUS] R12 = ", ss.__r12);
sig_hex(b"[go-lib SIGBUS] R13 = ", ss.__r13);
sig_hex(b"[go-lib SIGBUS] R14 = ", ss.__r14);
sig_hex(b"[go-lib SIGBUS] R15 = ", ss.__r15);
let rsp = ss.__rsp as usize;
if rsp >= 8 {
let below = *((rsp - 8) as *const u64);
sig_hex(b"[go-lib SIGBUS] [RSP-8] = ", below);
}
}
}
let mp = super::m::current_m();
if !mp.is_null() {
let g0 = unsafe { (*mp).g0 };
if !g0.is_null() {
let g0lo = unsafe { (*g0).stack.lo } as u64;
let g0hi = unsafe { (*g0).stack.hi } as u64;
unsafe { sig_hex(b"[go-lib SIGBUS] g0.stack.lo = ", g0lo) };
unsafe { sig_hex(b"[go-lib SIGBUS] g0.stack.hi = ", g0hi) };
#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
if !ctx.is_null() {
unsafe {
let uc = ctx as *mut libc::ucontext_t;
let ss = &(*(*uc).uc_mcontext).__ss;
let rsp = ss.__rsp as usize;
let hi = g0hi as usize;
let start = if rsp >= 8 { rsp - 8 } else { rsp };
let end = hi.min(start + 20 * 8);
let mut addr = start;
sig_write(b"[go-lib SIGBUS] --- g0 stack scan (RSP-8 .. g0.hi) ---\n");
while addr < end {
sig_hex(b"[go-lib SIGBUS] @", addr as u64);
let val = *(addr as *const u64);
sig_hex(b"[go-lib SIGBUS] = ", val);
addr += 8;
}
}
}
}
} else {
unsafe { sig_write(b"[go-lib SIGBUS] current_m = NULL\n") };
}
unsafe { libc::abort() };
}
#[cfg(all(windows, target_arch = "x86_64"))]
mod win_suspend_sys {
#[link(name = "kernel32")]
unsafe extern "system" {
pub fn SuspendThread(hThread: *mut u8) -> u32;
pub fn ResumeThread(hThread: *mut u8) -> u32;
pub fn GetThreadContext(hThread: *mut u8, lpContext: *mut u8) -> i32;
pub fn SetThreadContext(hThread: *mut u8, lpContext: *const u8) -> i32;
}
}
#[cfg(all(windows, target_arch = "x86_64"))]
#[repr(C, align(16))]
struct CtxBuf([u8; 1280]);
#[cfg(all(windows, target_arch = "x86_64"))]
unsafe fn windows_should_preempt(mp: *mut M, rip: usize, rsp: usize) -> bool {
let gp = unsafe { (*mp).curg };
if gp.is_null() || !unsafe { (*gp).preempt } {
return false; }
if unsafe { (*mp).locks.load(Relaxed) } > 0 {
return false;
}
if !pc_in_our_text(rip) {
return false;
}
if unsafe { readgstatus(gp) } != GRUNNING {
return false;
}
let (lo, hi) = unsafe { ((*gp).stack.lo, (*gp).stack.hi) };
if rsp < lo || rsp > hi {
return false;
}
if rsp < lo + ASYNC_PREEMPT_HEADROOM {
return false;
}
if rsp & 7 != 0 {
return false;
}
if pc_in_scheduler_asm(rip) {
return false;
}
true
}
#[cfg(all(windows, target_arch = "x86_64"))]
pub(crate) unsafe fn preempt_m_windows(mp: *mut M) {
let h = unsafe { (*mp).thread_handle } as *mut u8;
if h.is_null() {
return; }
const CONTEXT_AMD64: u32 = 0x0010_0000;
const CONTEXT_CONTROL: u32 = CONTEXT_AMD64 | 0x1; const OFF_FLAGS: usize = 48;
const OFF_RSP: usize = 152;
const OFF_RIP: usize = 248;
let mut buf = CtxBuf([0u8; 1280]);
let base = buf.0.as_mut_ptr();
unsafe { (base.add(OFF_FLAGS) as *mut u32).write(CONTEXT_CONTROL) };
unsafe { win_suspend_sys::SuspendThread(h) };
if unsafe { win_suspend_sys::GetThreadContext(h, base) } == 0 {
unsafe { win_suspend_sys::ResumeThread(h) };
return;
}
let rsp = unsafe { (base.add(OFF_RSP) as *const u64).read() } as usize;
let rip = unsafe { (base.add(OFF_RIP) as *const u64).read() } as usize;
if unsafe { windows_should_preempt(mp, rip, rsp) } {
let new_rsp = rsp - 128 - 8;
unsafe {
(new_rsp as *mut usize).write(rip);
(base.add(OFF_RSP) as *mut u64).write(new_rsp as u64);
(base.add(OFF_RIP) as *mut u64)
.write(async_preempt_trampoline as *const () as u64);
win_suspend_sys::SetThreadContext(h, base);
}
}
unsafe { win_suspend_sys::ResumeThread(h) };
}
#[cfg(windows)]
static VEH_HANDLING: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
#[cfg(windows)]
mod win_veh_sys {
#[link(name = "kernel32")]
unsafe extern "system" {
pub fn AddVectoredExceptionHandler(
FirstHandler: u32,
VectoredHandler: unsafe extern "system" fn(*mut u8) -> i32,
) -> *mut u8;
pub fn GetStdHandle(nStdHandle: u32) -> *mut u8;
pub fn WriteFile(
hFile: *mut u8,
lpBuffer: *const u8,
nNumberOfBytesToWrite: u32,
lpNumberOfBytesWritten: *mut u32,
lpOverlapped: *mut u8,
) -> i32;
pub fn FlushFileBuffers(hFile: *mut u8) -> i32;
pub fn GetCurrentProcess() -> *mut u8;
pub fn TerminateProcess(hProcess: *mut u8, uExitCode: u32) -> i32;
pub fn CreateFileW(
lpFileName: *const u16,
dwDesiredAccess: u32,
dwShareMode: u32,
lpSecurityAttributes: *mut u8,
dwCreationDisposition: u32,
dwFlagsAndAttributes: u32,
hTemplateFile: *mut u8,
) -> *mut u8;
pub fn CloseHandle(hObject: *mut u8) -> i32;
pub fn Sleep(dwMilliseconds: u32);
}
}
#[cfg(windows)]
unsafe fn veh_write(h: *mut u8, s: &[u8]) {
let mut n = 0u32;
unsafe {
win_veh_sys::WriteFile(h, s.as_ptr(), s.len() as u32, &mut n,
core::ptr::null_mut());
}
}
#[cfg(windows)]
unsafe fn veh_open_crash_file() -> *mut u8 {
const PATH: &[u16] = &[
b'.' as u16, b'\\' as u16,
b'g' as u16, b'o' as u16, b'-' as u16, b'l' as u16, b'i' as u16,
b'b' as u16, b'-' as u16, b'c' as u16, b'r' as u16, b'a' as u16,
b's' as u16, b'h' as u16, b'-' as u16, b'v' as u16, b'e' as u16,
b'h' as u16, b'.' as u16, b't' as u16, b'x' as u16, b't' as u16,
0u16, ];
const FILE_APPEND_DATA: u32 = 0x0000_0004;
const FILE_SHARE_READ: u32 = 0x0000_0001;
const OPEN_ALWAYS: u32 = 4; const FILE_ATTRIBUTE_NORMAL: u32 = 0x80;
unsafe {
win_veh_sys::CreateFileW(
PATH.as_ptr(),
FILE_APPEND_DATA,
FILE_SHARE_READ,
core::ptr::null_mut(),
OPEN_ALWAYS,
FILE_ATTRIBUTE_NORMAL,
core::ptr::null_mut(),
)
}
}
#[cfg(windows)]
unsafe fn veh_write_hex(h: *mut u8, val: u64) {
const HEX: &[u8] = b"0123456789abcdef";
let mut buf = [b'0'; 20];
buf[0] = b'0'; buf[1] = b'x';
for i in 0..16usize {
buf[17 - i] = HEX[((val >> (i * 4)) & 0xf) as usize];
}
buf[18] = b'\r'; buf[19] = b'\n';
let mut n = 0u32;
unsafe {
win_veh_sys::WriteFile(h, buf.as_ptr(), 20, &mut n,
core::ptr::null_mut());
}
}
#[cfg(windows)]
pub(crate) fn install_windows_veh() {
#[cfg(target_arch = "x86_64")]
OUR_TEXT_ANCHOR.store(goroutine_entry as *const () as usize, Relaxed);
unsafe {
win_veh_sys::AddVectoredExceptionHandler(1, windows_veh_handler);
let stderr = win_veh_sys::GetStdHandle(0xFFFF_FFF4u32); let marker = b"[go-lib] VEH installed\r\n";
let mut n = 0u32;
win_veh_sys::WriteFile(stderr, marker.as_ptr(), marker.len() as u32,
&mut n, core::ptr::null_mut());
win_veh_sys::FlushFileBuffers(stderr);
let fh = veh_open_crash_file();
const INVALID_HANDLE: *mut u8 = usize::MAX as *mut u8;
if !fh.is_null() && fh != INVALID_HANDLE {
win_veh_sys::WriteFile(fh, marker.as_ptr(), marker.len() as u32,
&mut n, core::ptr::null_mut());
win_veh_sys::CloseHandle(fh);
}
}
}
#[cfg(windows)]
unsafe extern "system" fn windows_veh_handler(ep: *mut u8) -> i32 {
const STATUS_ACCESS_VIOLATION: u32 = 0xC000_0005;
const STATUS_STACK_OVERFLOW: u32 = 0xC000_00FD;
const EXCEPTION_CONTINUE_SEARCH: i32 = 0;
const STD_ERROR_HANDLE: u32 = 0xFFFF_FFF4;
if VEH_HANDLING.swap(true, std::sync::atomic::Ordering::AcqRel) {
return EXCEPTION_CONTINUE_SEARCH;
}
let h = unsafe { win_veh_sys::GetStdHandle(STD_ERROR_HANDLE) };
let ptrs = ep as *const usize;
let er_ptr = unsafe { *ptrs } as *const u8; let ctx_ptr = unsafe { *ptrs.add(1) } as *const u8;
let code: u32 = unsafe { (er_ptr as *const u32).read() };
if code != STATUS_ACCESS_VIOLATION && code != STATUS_STACK_OVERFLOW {
VEH_HANDLING.store(false, std::sync::atomic::Ordering::Release);
return EXCEPTION_CONTINUE_SEARCH;
}
unsafe {
veh_write(h, b"[go-lib VEH] exception code : ");
veh_write_hex(h, code as u64);
let instr: usize = *(er_ptr.add(16) as *const usize);
veh_write(h, b"[go-lib VEH] ExceptionAddress : ");
veh_write_hex(h, instr as u64);
let n_params: u32 = *(er_ptr.add(24) as *const u32);
if n_params >= 2 {
let ft: usize = *(er_ptr.add(32) as *const usize);
let fa: usize = *(er_ptr.add(40) as *const usize);
veh_write(h, b"[go-lib VEH] fault_type (0=rd,1=wr,8=DEP) : ");
veh_write_hex(h, ft as u64);
veh_write(h, b"[go-lib VEH] fault_addr : ");
veh_write_hex(h, fa as u64);
}
let rip: u64 = *(ctx_ptr.add(248) as *const u64);
let rsp: u64 = *(ctx_ptr.add(152) as *const u64);
let rbp: u64 = *(ctx_ptr.add(160) as *const u64);
let rax: u64 = *(ctx_ptr.add(120) as *const u64);
let rcx: u64 = *(ctx_ptr.add(128) as *const u64);
let rdx: u64 = *(ctx_ptr.add(136) as *const u64);
let rbx: u64 = *(ctx_ptr.add(144) as *const u64);
let rsi: u64 = *(ctx_ptr.add(168) as *const u64);
let rdi: u64 = *(ctx_ptr.add(176) as *const u64);
let r8: u64 = *(ctx_ptr.add(184) as *const u64);
let r9: u64 = *(ctx_ptr.add(192) as *const u64);
veh_write(h, b"[go-lib VEH] RIP : "); veh_write_hex(h, rip);
veh_write(h, b"[go-lib VEH] RSP : "); veh_write_hex(h, rsp);
veh_write(h, b"[go-lib VEH] RBP : "); veh_write_hex(h, rbp);
veh_write(h, b"[go-lib VEH] RAX : "); veh_write_hex(h, rax);
veh_write(h, b"[go-lib VEH] RCX : "); veh_write_hex(h, rcx);
veh_write(h, b"[go-lib VEH] RDX : "); veh_write_hex(h, rdx);
veh_write(h, b"[go-lib VEH] RBX : "); veh_write_hex(h, rbx);
veh_write(h, b"[go-lib VEH] RSI : "); veh_write_hex(h, rsi);
veh_write(h, b"[go-lib VEH] RDI : "); veh_write_hex(h, rdi);
veh_write(h, b"[go-lib VEH] R8 : "); veh_write_hex(h, r8);
veh_write(h, b"[go-lib VEH] R9 : "); veh_write_hex(h, r9);
let stdout = win_veh_sys::GetStdHandle(0xFFFF_FFF5u32); veh_write(stdout, b"[go-lib VEH] exception code : ");
veh_write_hex(stdout, code as u64);
let instr2: usize = *(er_ptr.add(16) as *const usize);
veh_write(stdout, b"[go-lib VEH] ExceptionAddress : ");
veh_write_hex(stdout, instr2 as u64);
let rip2: u64 = *(ctx_ptr.add(248) as *const u64);
let rsp2: u64 = *(ctx_ptr.add(152) as *const u64);
veh_write(stdout, b"[go-lib VEH] RIP : "); veh_write_hex(stdout, rip2);
veh_write(stdout, b"[go-lib VEH] RSP : "); veh_write_hex(stdout, rsp2);
win_veh_sys::FlushFileBuffers(stdout);
let fh = veh_open_crash_file();
const INVALID_HANDLE: *mut u8 = usize::MAX as *mut u8;
if !fh.is_null() && fh != INVALID_HANDLE {
veh_write(fh, b"[go-lib VEH] exception code : ");
veh_write_hex(fh, code as u64);
let instr3: usize = *(er_ptr.add(16) as *const usize);
veh_write(fh, b"[go-lib VEH] ExceptionAddress : ");
veh_write_hex(fh, instr3 as u64);
if n_params >= 2 {
let ft2: usize = *(er_ptr.add(32) as *const usize);
let fa2: usize = *(er_ptr.add(40) as *const usize);
veh_write(fh, b"[go-lib VEH] fault_type : ");
veh_write_hex(fh, ft2 as u64);
veh_write(fh, b"[go-lib VEH] fault_addr : ");
veh_write_hex(fh, fa2 as u64);
}
let rip3: u64 = *(ctx_ptr.add(248) as *const u64);
let rsp3: u64 = *(ctx_ptr.add(152) as *const u64);
let rbp3: u64 = *(ctx_ptr.add(160) as *const u64);
let rax3: u64 = *(ctx_ptr.add(120) as *const u64);
let rcx3: u64 = *(ctx_ptr.add(128) as *const u64);
let rdx3: u64 = *(ctx_ptr.add(136) as *const u64);
veh_write(fh, b"[go-lib VEH] RIP : "); veh_write_hex(fh, rip3);
veh_write(fh, b"[go-lib VEH] RSP : "); veh_write_hex(fh, rsp3);
veh_write(fh, b"[go-lib VEH] RBP : "); veh_write_hex(fh, rbp3);
veh_write(fh, b"[go-lib VEH] RAX : "); veh_write_hex(fh, rax3);
veh_write(fh, b"[go-lib VEH] RCX : "); veh_write_hex(fh, rcx3);
veh_write(fh, b"[go-lib VEH] RDX : "); veh_write_hex(fh, rdx3);
win_veh_sys::CloseHandle(fh);
}
win_veh_sys::FlushFileBuffers(h);
win_veh_sys::Sleep(500);
win_veh_sys::TerminateProcess(win_veh_sys::GetCurrentProcess(), code);
}
EXCEPTION_CONTINUE_SEARCH
}
#[unsafe(no_mangle)]
pub(crate) unsafe extern "C" fn m_thread_exit() -> ! {
#[cfg(not(windows))]
unsafe {
libc::pthread_exit(core::ptr::null_mut())
}
#[cfg(windows)]
unsafe {
m_thread_exit_sys::ExitThread(0)
}
}
#[cfg(windows)]
mod m_thread_exit_sys {
#[link(name = "kernel32")]
unsafe extern "system" {
pub fn ExitThread(dwExitCode: u32) -> !;
}
}
struct GoFn(Box<dyn FnOnce() + Send + 'static>);
type PanicFn = Arc<dyn Fn(Box<dyn Any + Send + 'static>) + Send + Sync + 'static>;
static PANIC_HANDLER: Mutex<Option<PanicFn>> = Mutex::new(None);
pub fn set_panic_handler<F>(f: F)
where
F: Fn(Box<dyn Any + Send + 'static>) + Send + Sync + 'static,
{
*PANIC_HANDLER.lock().unwrap() = Some(Arc::new(f));
}
fn handle_goroutine_panic(payload: Box<dyn Any + Send + 'static>) {
let handler = PANIC_HANDLER.lock().unwrap_or_else(|e| e.into_inner()).clone();
match handler {
Some(f) => f(payload),
None => {
let msg = extract_panic_msg(&payload);
eprintln!("goroutine panicked: {msg}");
}
}
}
fn extract_panic_msg(payload: &(dyn Any + Send)) -> String {
const MAX_DEPTH: u32 = 4;
fn recurse(p: &(dyn Any + Send), depth: u32) -> Option<String> {
if let Some(s) = p.downcast_ref::<String>() {
return Some(s.clone());
}
if let Some(s) = p.downcast_ref::<&str>() {
return Some((*s).to_string());
}
if depth < MAX_DEPTH
&& let Some(inner) = p.downcast_ref::<Box<dyn Any + Send + 'static>>()
{
return recurse(inner.as_ref(), depth + 1);
}
None
}
recurse(payload, 0).unwrap_or_else(|| "(unknown panic payload)".to_string())
}
pub fn gomaxprocs() -> usize {
sched().gomaxprocs.load(Relaxed) as usize
}
pub fn set_gomaxprocs(n: usize) -> usize {
let n = n.clamp(1, 256) as i32;
let sc = sched();
let old = {
let mut inner = sc.inner.lock().unwrap();
let old = inner.gomaxprocs;
if n > old {
let new_ps: Vec<*mut P> = (old..n)
.map(|id| Box::into_raw(P::new(id)))
.collect();
inner.allp.extend_from_slice(&new_ps);
inner.gomaxprocs = n;
sc.gomaxprocs.store(n, Relaxed);
drop(inner);
for p_ptr in new_ps {
let id = NEXT_MID.fetch_add(1, Relaxed);
unsafe { spawn_m(sc, id, p_ptr) };
}
} else {
inner.gomaxprocs = n;
sc.gomaxprocs.store(n, Relaxed);
}
old
};
old as usize
}
static NEXT_GOID: AtomicU64 = AtomicU64::new(1);
static G_FREE: Mutex<Vec<usize>> = Mutex::new(Vec::new());
static GPOOL_OFF: AtomicU8 = AtomicU8::new(u8::MAX);
#[inline]
fn gpool_off() -> bool {
let mut v = GPOOL_OFF.load(Relaxed);
if v == u8::MAX {
v = match std::env::var("GOLIB_GPOOL_OFF") {
Ok(s) if s == "1" => 1,
_ => 0,
};
GPOOL_OFF.store(v, Relaxed);
}
v == 1
}
unsafe fn gfree_put(gp: *mut G) {
if gpool_off() {
let stack = unsafe { (*gp).stack };
unsafe { stack_pool_free(&stack) };
return;
}
let stack = unsafe { (*gp).stack };
if stack.hi - stack.lo != GOROUTINE_STACK_BYTES {
unsafe { stack_pool_free(&stack) };
unsafe { (*gp).stack = Stack { lo: 0, hi: 0 } };
}
let _lk = super::m::m_lock();
G_FREE.lock().unwrap().push(gp as usize);
}
unsafe fn gfree_get() -> Option<*mut G> {
if gpool_off() {
return None;
}
let _lk = super::m::m_lock();
G_FREE.lock().unwrap().pop().map(|a| a as *mut G)
}
static NEXT_MID: AtomicI64 = AtomicI64::new(1);
static SIGNALS_INSTALLED: AtomicBool = AtomicBool::new(false);
unsafe extern "C" fn goroutine_entry() {
#[cfg(not(windows))]
unsafe { super::m::block_sigurg() };
let gp = current_g();
let go_fn = unsafe {
let fn_ptr = (*gp).sched.ctxt as *mut GoFn;
(*gp).sched.ctxt = ptr::null_mut();
Box::from_raw(fn_ptr)
};
#[cfg(not(windows))]
unsafe { super::m::unblock_sigurg() };
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| (go_fn.0)()));
if let Err(payload) = result {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
handle_goroutine_panic(payload);
}));
}
}
#[cfg(target_arch = "x86_64")]
#[unsafe(naked)]
unsafe extern "C" fn goexit_trampoline() -> ! {
core::arch::naked_asm!(
"call {handler}",
"ud2", handler = sym goexit0_handler,
)
}
#[cfg(target_arch = "x86_64")]
unsafe extern "C" fn goexit0_handler() -> ! {
#[cfg(not(windows))]
unsafe { super::m::block_sigurg() };
unsafe { (*current_m()).locks.fetch_add(1, std::sync::atomic::Ordering::Relaxed) };
let gp = current_g();
unsafe { mcall(gp, goexit0) };
unsafe { std::hint::unreachable_unchecked() }
}
#[cfg(target_arch = "aarch64")]
unsafe extern "C" fn goexit_trampoline() -> ! {
#[cfg(not(windows))]
unsafe { super::m::block_sigurg() };
unsafe { (*current_m()).locks.fetch_add(1, std::sync::atomic::Ordering::Relaxed) };
let gp = current_g();
unsafe { mcall(gp, goexit0) };
unsafe { std::hint::unreachable_unchecked() }
}
pub(crate) fn new_goroutine(f: impl FnOnce() + Send + 'static) -> *mut G {
let goid = NEXT_GOID.fetch_add(1, Relaxed);
let gp: *mut G = match unsafe { gfree_get() } {
Some(gp) => {
let mut stack = unsafe { (*gp).stack };
if stack.lo == 0 {
stack = unsafe {
stack_alloc().expect("new_goroutine: stack_alloc failed (reuse)")
};
}
unsafe { ptr::write(gp, G::value(stack, goid)) };
unsafe { (*gp).sched.g = gp };
gp
}
None => {
let stack = unsafe { stack_alloc().expect("new_goroutine: stack_alloc failed") };
Box::into_raw(G::new(stack, goid))
}
};
let g: &mut G = unsafe { &mut *gp };
let go_fn = Box::new(GoFn(Box::new(f)));
g.sched.ctxt = Box::into_raw(go_fn) as *mut u8;
g.sched.pc = goroutine_entry as *const () as usize;
#[cfg(target_arch = "x86_64")]
{
let ret_slot = (g.stack.hi - 8) as *mut usize;
unsafe { ret_slot.write(goexit_trampoline as *const () as usize) };
g.sched.sp = g.stack.hi - 8;
g.sched.bp = 0; }
#[cfg(target_arch = "aarch64")]
{
g.sched.lr = goexit_trampoline as usize;
g.sched.sp = g.stack.hi;
g.sched.bp = 0;
}
g.atomicstatus.store(GRUNNABLE, Release);
gp
}
pub(crate) fn spawn_goroutine(f: impl FnOnce() + Send + 'static) {
debug_assert!(
!current_rt_ptr().is_null(),
"spawn_goroutine called without an active Rt; call run_impl first"
);
let g_ptr = new_goroutine(f);
let _lk = super::m::m_lock();
unsafe {
sched().allg.lock().unwrap().push(g_ptr);
(*g_ptr).schedlink = ptr::null_mut();
sched().global_run_q.push_batch(g_ptr, g_ptr, 1);
startm(ptr::null_mut());
}
}
fn schedinit(nprocs: i32) -> &'static Rt {
assert!(nprocs >= 1, "schedinit: nprocs must be ≥ 1");
{
static PANIC_HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
if !PANIC_HOOK_INSTALLED.swap(true, AcqRel) {
let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
if current_g().is_null() {
prev_hook(info);
}
}));
}
}
let nprocs = std::env::var("GOMAXPROCS")
.ok()
.and_then(|s| s.parse::<i32>().ok())
.filter(|&n| (1..=256).contains(&n))
.unwrap_or(nprocs);
let rt: &'static Rt = Box::leak(Box::new(Rt {
global_run_q: GlobalRunQueue::new(),
nmspinning: AtomicI32::new(0),
gomaxprocs: AtomicI32::new(nprocs),
inner: Mutex::new(SchedInner {
idle_p: ptr::null_mut(),
idle_m: ptr::null_mut(),
nmidle: 0,
allp: Vec::new(),
gomaxprocs: nprocs,
}),
allg: Mutex::new(Vec::new()),
shutdown: AtomicBool::new(false),
}));
set_current_rt(rt as *const Rt);
let ps: Vec<*mut P> = (0..nprocs)
.map(|id| Box::into_raw(P::new(id)))
.collect();
{
let mut inner = rt.inner.lock().unwrap();
inner.allp = ps.clone();
}
for p_ptr in ps {
let id = NEXT_MID.fetch_add(1, Relaxed);
unsafe { spawn_m(rt, id, p_ptr) };
}
if !SIGNALS_INSTALLED.swap(true, AcqRel) {
#[cfg(not(windows))]
unsafe { install_sigsegv_handler() };
#[cfg(not(windows))]
unsafe { install_sigurg_handler() };
#[cfg(not(windows))]
unsafe { install_sigbus_handler() };
#[cfg(windows)]
install_windows_veh();
}
super::netpoll::netpoll_init();
start_sysmon(rt);
start_timer_thread();
rt
}
unsafe fn spawn_m(rt: &'static Rt, id: i64, p: *mut P) {
let m = Box::into_raw(unsafe { M::new(id) });
unsafe {
(*m).p = p;
(*p).m = m;
(*p).status.store(PRUNNING, Release);
}
let m_addr = m as usize;
let rt_addr = rt as *const Rt as usize;
std::thread::spawn(move || {
let m = m_addr as *mut M;
let rt = rt_addr as *const Rt;
unsafe {
set_current_rt(rt);
(*m).start();
schedule();
}
});
}
pub(crate) fn run_impl<F, R>(f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let nprocs = std::thread::available_parallelism()
.map(|n| n.get() as i32)
.unwrap_or(1);
let rt: &'static Rt = GLOBAL_RT.get_or_init(|| schedinit(nprocs));
set_current_rt(rt as *const Rt);
struct UnparkOnDrop(std::thread::Thread);
impl Drop for UnparkOnDrop {
fn drop(&mut self) { self.0.unpark(); }
}
type Slot<R> = Result<R, Box<dyn std::any::Any + Send + 'static>>;
let slot: Arc<Mutex<Option<Slot<R>>>> = Arc::new(Mutex::new(None));
let slot2 = Arc::clone(&slot);
let caller = std::thread::current();
let wrapper = move || {
let _guard = UnparkOnDrop(caller);
let result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
*slot2.lock().unwrap() = Some(result);
};
spawn_goroutine(wrapper);
std::thread::park();
match slot.lock().unwrap().take() {
Some(Ok(v)) => v,
Some(Err(payload)) => {
let msg = extract_panic_msg(payload.as_ref());
std::panic::panic_any(format!("goroutine panicked: {msg}"))
}
None => panic!("go_lib::__main_entry: first goroutine exited without storing a result"),
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
#[test]
fn global_run_queue_round_trip() {
use crate::runtime::g::{Stack, G};
use crate::runtime::p::GlobalRunQueue;
use crate::runtime::stack::GOROUTINE_STACK_BYTES;
let q = GlobalRunQueue::new();
let lo = 0x200000usize;
let g1 = G::new(Stack { lo, hi: lo + GOROUTINE_STACK_BYTES }, 99);
let g1_ptr = Box::into_raw(g1);
unsafe {
(*g1_ptr).schedlink = ptr::null_mut();
q.push_batch(g1_ptr, g1_ptr, 1);
assert_eq!(q.len(), 1);
let got = q.pop();
assert_eq!(got, g1_ptr);
assert_eq!(q.len(), 0);
let _ = Box::from_raw(g1_ptr);
}
}
#[test]
fn new_goroutine_fields() {
use crate::runtime::g::GRUNNABLE;
use std::sync::atomic::Ordering::Relaxed;
let g_ptr = new_goroutine(|| {});
unsafe {
assert_eq!(
(*g_ptr).atomicstatus.load(Relaxed),
GRUNNABLE,
"new goroutine must start as Grunnable"
);
assert_ne!((*g_ptr).sched.pc, 0, "pc must be set to goroutine_entry");
assert!(!(*g_ptr).sched.ctxt.is_null(), "ctxt must hold the closure");
#[cfg(target_arch = "x86_64")]
{
assert_eq!((*g_ptr).sched.sp, (*g_ptr).stack.hi - 8);
let ret_addr = ((*g_ptr).sched.sp as *const usize).read();
assert_eq!(ret_addr, goexit_trampoline as *const () as usize);
}
#[cfg(target_arch = "aarch64")]
{
assert_eq!((*g_ptr).sched.sp, (*g_ptr).stack.hi);
assert_eq!((*g_ptr).sched.lr, goexit_trampoline as *const () as usize);
}
gfree_put(g_ptr);
}
}
#[test]
#[go_lib::main]
fn run_single_goroutine() {
use std::sync::atomic::{AtomicBool, Ordering};
static RAN: AtomicBool = AtomicBool::new(false);
RAN.store(true, Ordering::Release);
assert!(RAN.load(Ordering::Acquire), "goroutine body did not execute");
}
#[test]
#[go_lib::main]
fn entry_reuses_running_scheduler() {
assert!(
!global_rt_ptr().is_null(),
"the singleton scheduler must be running inside an entry body"
);
let (tx, rx) = crate::chan::chan::<i32>(0);
crate::go!(move || { tx.send(99); });
assert_eq!(rx.recv(), Some(99), "spawned goroutine did not run");
}
#[test]
#[go_lib::main]
fn gosched_returns_to_caller() {
use std::sync::atomic::{AtomicBool, Ordering};
static AFTER_YIELD: AtomicBool = AtomicBool::new(false);
unsafe { gosched() };
AFTER_YIELD.store(true, Ordering::Release);
assert!(
AFTER_YIELD.load(Ordering::Acquire),
"execution must continue after gosched()"
);
}
#[test]
#[go_lib::main]
fn gosched_allows_other_goroutines_to_run() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let flag = Arc::new(AtomicBool::new(false));
let flag_setter = Arc::clone(&flag);
spawn_goroutine(move || {
flag_setter.store(true, Ordering::Release);
});
while !flag.load(Ordering::Acquire) {
unsafe { gosched() };
}
}
#[test]
#[go_lib::main]
fn goroutine_exit_releases_m_locks() {
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
use std::sync::Arc;
const N: usize = 64;
let exited = Arc::new(AtomicUsize::new(0));
let checked = Arc::new(AtomicUsize::new(0));
let max_locks1 = Arc::new(AtomicI32::new(0));
let max_locks2 = Arc::new(AtomicI32::new(0));
let exited_w = Arc::clone(&exited);
let checked_w = Arc::clone(&checked);
let max_locks1_w = Arc::clone(&max_locks1);
let max_locks2_w = Arc::clone(&max_locks2);
for _ in 0..N {
let exited = Arc::clone(&exited_w);
let max_locks1 = Arc::clone(&max_locks1_w);
spawn_goroutine(move || {
let locks = unsafe { (*current_m()).locks.load(Ordering::Relaxed) };
max_locks1.fetch_max(locks, Ordering::AcqRel);
exited.fetch_add(1, Ordering::AcqRel);
});
}
while exited_w.load(Ordering::Acquire) < N {
unsafe { gosched() };
}
for _ in 0..N {
let checked = Arc::clone(&checked_w);
let max_locks2 = Arc::clone(&max_locks2_w);
spawn_goroutine(move || {
let locks = unsafe { (*current_m()).locks.load(Ordering::Relaxed) };
max_locks2.fetch_max(locks, Ordering::AcqRel);
checked.fetch_add(1, Ordering::AcqRel);
});
}
while checked_w.load(Ordering::Acquire) < N {
unsafe { gosched() };
}
let m1 = max_locks1.load(Ordering::Acquire);
let m2 = max_locks2.load(Ordering::Acquire);
assert!(
m1 == 0 && m2 == 0,
"m.locks nonzero at goroutine entry (batch1 max = {m1}, batch2 \
max = {m2}) — batch1 > 0 means the stray count predates any \
goroutine exit (spawn/ready side); batch2-only means the goexit \
path leaked an increment"
);
}
#[test]
#[go_lib::main]
fn m_lock_pins_current_m_under_load() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::runtime::m::m_lock;
const N: usize = 128;
let violations = Arc::new(AtomicUsize::new(0));
let done = Arc::new(AtomicUsize::new(0));
let violations_w = Arc::clone(&violations);
let done_w = Arc::clone(&done);
for _ in 0..N {
let v = Arc::clone(&violations_w);
let d = Arc::clone(&done_w);
spawn_goroutine(move || {
for _ in 0..2_000 {
let guard = m_lock();
let pinned = current_m();
for _ in 0..64 {
if current_m() != pinned {
v.fetch_add(1, Ordering::Relaxed);
break;
}
std::hint::spin_loop();
}
drop(guard);
unsafe { gosched() };
}
d.fetch_add(1, Ordering::Relaxed);
});
}
while done_w.load(Ordering::Acquire) < N {
unsafe { gosched() };
}
assert_eq!(
violations.load(Ordering::Acquire),
0,
"current_m() changed while an MLockGuard was held — the optimistic \
pin failed to suppress preemption (Guard 0 did not observe the \
atomic m.locks bump)"
);
}
}