use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
use super::g::{current_g, WaitReason};
use super::park::{gopark, goready};
use super::g::G;
use super::sched::set_current_rt;
fn mono_ns() -> u64 {
static START: OnceLock<Instant> = OnceLock::new();
let start = *START.get_or_init(Instant::now);
Instant::now().duration_since(start).as_nanos() as u64
}
#[derive(Eq, PartialEq)]
struct TimerEntry {
when: u64,
gp: *mut G,
}
unsafe impl Send for TimerEntry {}
impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
Reverse(self.when).cmp(&Reverse(other.when))
}
}
const TIMER_SHARDS: usize = 64;
const NO_TIMER: u64 = u64::MAX;
struct TimerShard {
heap: Mutex<BinaryHeap<TimerEntry>>,
earliest: AtomicU64,
}
impl TimerShard {
const fn new() -> Self {
TimerShard {
heap: Mutex::new(BinaryHeap::new()),
earliest: AtomicU64::new(NO_TIMER),
}
}
}
#[inline]
fn refresh_earliest(shard: &TimerShard, heap: &BinaryHeap<TimerEntry>) {
let e = heap.peek().map(|e| e.when).unwrap_or(NO_TIMER);
shard.earliest.store(e, Relaxed);
}
static TIMER_SHARDS_ARR: [TimerShard; TIMER_SHARDS] =
[const { TimerShard::new() }; TIMER_SHARDS];
static TIMER_THREAD: OnceLock<std::thread::Thread> = OnceLock::new();
#[inline]
fn current_shard() -> &'static TimerShard {
let p = super::sched::current_p();
let idx = if p.is_null() {
0
} else {
(unsafe { (*p).id } as usize) & (TIMER_SHARDS - 1)
};
&TIMER_SHARDS_ARR[idx]
}
pub(crate) fn start_timer_thread() {
static STARTED: std::sync::Once = std::sync::Once::new();
STARTED.call_once(|| {
std::thread::Builder::new()
.name("go-timer".into())
.spawn(timer_thread_body)
.expect("time: failed to spawn timer thread");
});
}
fn timer_thread_body() {
let handle = std::thread::current();
let _ = TIMER_THREAD.set(handle);
loop {
let next = TIMER_SHARDS_ARR
.iter()
.map(|s| s.earliest.load(Relaxed))
.min()
.unwrap_or(NO_TIMER);
let sleep_dur = if next == NO_TIMER {
Duration::from_secs(1) } else {
let now = mono_ns();
if next <= now {
Duration::ZERO
} else {
Duration::from_nanos(next - now)
}
};
if sleep_dur > Duration::ZERO {
std::thread::park_timeout(sleep_dur);
}
fire_expired();
}
}
fn fire_expired() {
use super::g::{readgstatus, GDEAD, GWAITING};
let now = mono_ns();
struct WakeEntry { gp: *mut G }
let rt = super::sched::global_rt_ptr();
if rt.is_null() {
return;
}
set_current_rt(rt);
let retry_when = now.saturating_add(5_000_000);
for shard in &TIMER_SHARDS_ARR {
if shard.earliest.load(Relaxed) > now {
continue; }
let mut heap = shard.heap.lock().unwrap();
let mut to_wake: Vec<WakeEntry> = Vec::new();
while let Some(entry) = heap.peek() {
if entry.when <= now {
let entry = heap.pop().unwrap();
to_wake.push(WakeEntry { gp: entry.gp });
} else {
break;
}
}
for entry in to_wake {
let s = unsafe { readgstatus(entry.gp) };
if s == GWAITING {
unsafe { goready(entry.gp) };
} else if s != GDEAD {
heap.push(TimerEntry { when: retry_when, gp: entry.gp });
}
}
refresh_earliest(shard, &heap);
}
}
pub(crate) unsafe fn sleep(d: Duration) {
if d.is_zero() {
unsafe { super::sched::gosched() };
return;
}
let when = mono_ns().saturating_add(d.as_nanos() as u64);
let gp = current_g();
debug_assert!(!gp.is_null(), "sleep: called from g0");
let shard = current_shard();
let need_unpark = {
let mut heap = shard.heap.lock().unwrap();
let lowered = shard.earliest.load(Relaxed) > when;
heap.push(TimerEntry { when, gp });
if lowered {
shard.earliest.store(when, Relaxed);
}
lowered
};
if need_unpark && let Some(t) = TIMER_THREAD.get() {
t.unpark();
}
gopark(WaitReason::Sleep);
}
pub(crate) unsafe fn goroutine_sleep(d: Duration) {
unsafe { sleep(d) };
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
#[test]
fn mono_ns_monotonic() {
let a = mono_ns();
let b = mono_ns();
assert!(b >= a, "mono_ns must be non-decreasing");
}
#[test]
#[go_lib::main]
fn fire_expired_wakes_goroutine() {
use std::time::Instant;
let t0 = Instant::now();
unsafe { sleep(Duration::from_millis(10)) };
let elapsed = t0.elapsed();
assert!(
elapsed >= Duration::from_millis(8),
"timer did not fire: elapsed only {:?}", elapsed
);
}
#[test]
#[go_lib::main]
fn sleep_zero_yields() {
unsafe { sleep(Duration::ZERO) };
}
#[test]
#[go_lib::main]
fn sleep_short_duration() {
use crate::runtime::sched::spawn_goroutine;
use crate::sync::WaitGroup;
let wg = Arc::new(WaitGroup::new());
let wg2 = Arc::clone(&wg);
wg.add(1);
spawn_goroutine(move || {
unsafe { sleep(Duration::from_millis(5)) };
wg2.done();
});
wg.wait();
}
#[test]
#[go_lib::main]
fn many_sleepers_across_shards() {
use crate::runtime::sched::spawn_goroutine;
use crate::sync::WaitGroup;
const N: i32 = 200;
let awoke = Arc::new(AtomicI32::new(0));
let awoke2 = Arc::clone(&awoke);
let wg = Arc::new(WaitGroup::new());
for k in 0..N {
let awoke3 = Arc::clone(&awoke2);
let wg2 = Arc::clone(&wg);
wg.add(1);
spawn_goroutine(move || {
unsafe { sleep(Duration::from_millis(5 + (k % 7) as u64)) };
awoke3.fetch_add(1, Ordering::Relaxed);
wg2.done();
});
}
wg.wait();
assert_eq!(awoke.load(Ordering::Acquire), N);
}
#[test]
#[go_lib::main]
fn concurrent_sleepers() {
use crate::runtime::sched::spawn_goroutine;
use crate::sync::WaitGroup;
const N: i32 = 4;
let awoke = Arc::new(AtomicI32::new(0));
let awoke2 = Arc::clone(&awoke);
let wg = Arc::new(WaitGroup::new());
for _ in 0..N {
let awoke3 = Arc::clone(&awoke2);
let wg2 = Arc::clone(&wg);
wg.add(1);
spawn_goroutine(move || {
unsafe { sleep(Duration::from_millis(10)) };
awoke3.fetch_add(1, Ordering::Relaxed);
wg2.done();
});
}
wg.wait();
assert_eq!(awoke.load(Ordering::Acquire), N);
}
}