use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
use super::g::{current_g, WaitReason};
use super::park::{gopark, goready};
use super::g::G;
fn mono_ns() -> u64 {
static START: OnceLock<Instant> = OnceLock::new();
let start = *START.get_or_init(Instant::now);
Instant::now().duration_since(start).as_nanos() as u64
}
#[derive(Eq, PartialEq)]
struct TimerEntry {
when: u64,
gp: *mut G,
}
unsafe impl Send for TimerEntry {}
impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
Reverse(self.when).cmp(&Reverse(other.when))
}
}
static TIMER_HEAP: Mutex<BinaryHeap<TimerEntry>> = Mutex::new(BinaryHeap::new());
static TIMER_THREAD: OnceLock<std::thread::Thread> = OnceLock::new();
pub(crate) fn start_timer_thread() {
if TIMER_THREAD.get().is_some() { return; }
std::thread::Builder::new()
.name("go-timer".into())
.spawn(timer_thread_body)
.expect("time: failed to spawn timer thread");
}
fn timer_thread_body() {
let handle = std::thread::current();
let _ = TIMER_THREAD.set(handle);
loop {
let sleep_dur = {
let heap = TIMER_HEAP.lock().unwrap();
if let Some(entry) = heap.peek() {
let now = mono_ns();
if entry.when <= now {
Duration::ZERO
} else {
Duration::from_nanos(entry.when - now)
}
} else {
Duration::from_secs(1) }
};
if sleep_dur > Duration::ZERO {
std::thread::park_timeout(sleep_dur);
}
fire_expired();
}
}
fn fire_expired() {
let now = mono_ns();
let mut to_wake: Vec<*mut G> = Vec::new();
{
let mut heap = TIMER_HEAP.lock().unwrap();
while let Some(entry) = heap.peek() {
if entry.when <= now {
let entry = heap.pop().unwrap();
to_wake.push(entry.gp);
} else {
break;
}
}
}
for gp in to_wake {
unsafe { goready(gp) };
}
}
pub(crate) unsafe fn sleep(d: Duration) {
if d.is_zero() {
unsafe { super::sched::gosched() };
return;
}
let when = mono_ns().saturating_add(d.as_nanos() as u64);
let gp = current_g();
debug_assert!(!gp.is_null(), "sleep: called from g0");
let need_unpark = {
let mut heap = TIMER_HEAP.lock().unwrap();
let was_earliest = heap.peek().map(|e| e.when > when).unwrap_or(true);
heap.push(TimerEntry { when, gp });
was_earliest
};
if need_unpark && let Some(t) = TIMER_THREAD.get() {
t.unpark();
}
unsafe { gopark(WaitReason::Sleep) };
}
pub unsafe fn goroutine_sleep(d: Duration) {
unsafe { sleep(d) };
}
#[cfg(all(test, not(loom)))]
#[allow(unused_unsafe)] mod tests {
use super::*;
use crate::runtime::sched::run_impl;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
#[test]
fn mono_ns_monotonic() {
let a = mono_ns();
let b = mono_ns();
assert!(b >= a, "mono_ns must be non-decreasing");
}
#[test]
fn fire_expired_wakes_goroutine() {
use std::time::Instant;
run_impl(|| {
let t0 = Instant::now();
unsafe { sleep(Duration::from_millis(10)) };
let elapsed = t0.elapsed();
assert!(
elapsed >= Duration::from_millis(8),
"timer did not fire: elapsed only {:?}", elapsed
);
});
}
#[test]
fn sleep_zero_yields() {
run_impl(|| {
unsafe { sleep(Duration::ZERO) };
});
}
#[test]
fn sleep_short_duration() {
use crate::runtime::sched::spawn_goroutine;
let done = Arc::new(AtomicI32::new(0));
let done2 = Arc::clone(&done);
run_impl(move || {
unsafe {
spawn_goroutine(move || {
unsafe { sleep(Duration::from_millis(5)) };
done2.store(1, Ordering::Relaxed);
});
}
let deadline = Instant::now() + Duration::from_millis(200);
loop {
if done.load(Ordering::Acquire) == 1 { break; }
if Instant::now() >= deadline {
panic!("sleep did not complete within 200 ms");
}
crate::gosched();
std::thread::sleep(Duration::from_millis(1));
}
});
}
#[test]
fn concurrent_sleepers() {
use crate::runtime::sched::spawn_goroutine;
const N: i32 = 4;
let awoke = Arc::new(AtomicI32::new(0));
let awoke2 = Arc::clone(&awoke);
run_impl(move || {
for _ in 0..N {
let awoke3 = Arc::clone(&awoke2);
unsafe {
spawn_goroutine(move || {
unsafe { sleep(Duration::from_millis(10)) };
awoke3.fetch_add(1, Ordering::Relaxed);
});
}
}
let deadline = Instant::now() + Duration::from_millis(500);
loop {
if awoke2.load(Ordering::Acquire) == N { break; }
if Instant::now() >= deadline {
panic!("not all sleepers woke within 500 ms");
}
crate::gosched();
std::thread::sleep(Duration::from_millis(5));
}
});
assert_eq!(awoke.load(Ordering::Acquire), N);
}
}