use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::Context;
use crate::platform::sys::{create_pipe, events_with_capacity, Interest};
use crate::reactor::{with_reactor, with_reactor_mut};
use crate::time::{next_timer_deadline, tick_timer_wheel};
#[cfg(unix)]
use crate::signal::{on_signal_readable, SIGNAL_TOKEN};
use super::scheduler::{GlobalQueue, LocalQueue};
use super::task::{Task, TaskHeader, STATE_CANCELLED, STATE_COMPLETED};
use super::waker::{make_waker_with_notifier, WorkerNotifier};
use super::work_stealing::{StealableQueue, WorkStealingPool};
const WAKE_TOKEN: usize = usize::MAX - 1;
pub(crate) struct WorkerThread {
pub worker_id: usize,
pub stealable: Arc<StealableQueue>,
pub local: LocalQueue,
pub global: Arc<GlobalQueue>,
pub steal_pool: Arc<WorkStealingPool>,
pub tasks: Arc<Mutex<HashMap<usize, Task>>>,
pub shutdown: Arc<AtomicBool>,
notifier: Arc<WorkerNotifier>,
wake_rx: i32,
wake_tx: i32,
}
impl WorkerThread {
pub(crate) fn new(
worker_id: usize,
global: Arc<GlobalQueue>,
steal_pool: Arc<WorkStealingPool>,
tasks: Arc<Mutex<HashMap<usize, Task>>>,
shutdown: Arc<AtomicBool>,
notifier: Arc<WorkerNotifier>,
) -> std::io::Result<Self> {
let (wake_rx, wake_tx) = create_pipe()?;
with_reactor(|r| r.register(wake_rx, WAKE_TOKEN, Interest::READABLE))?;
let stealable = Arc::new(StealableQueue::new());
Ok(Self {
worker_id,
stealable,
local: LocalQueue::new(),
global,
steal_pool,
tasks,
shutdown,
notifier,
wake_rx,
wake_tx,
})
}
pub(crate) fn wake_tx(&self) -> i32 {
self.wake_tx
}
fn next_task(&mut self) -> Option<Arc<TaskHeader>> {
if let Some(h) = self.local.pop() {
return Some(h);
}
{
let mut sq = self.stealable.local_mut();
if !sq.is_empty() {
let mut batch = Vec::with_capacity(16);
sq.drain_front(&mut batch, 16);
drop(sq);
for h in batch {
if self.local.push(h).is_some() {
}
}
return self.local.pop();
}
}
if let Some(h) = self.global.pop() {
return Some(h);
}
let n = self
.steal_pool
.steal_one(self.worker_id, &mut self.local, &self.global);
if n > 0 {
return self.local.pop();
}
None
}
pub(crate) fn run(&mut self) {
loop {
if self.shutdown.load(Ordering::Acquire) {
self.drain_all_tasks();
break;
}
let expired = tick_timer_wheel(std::time::Instant::now());
for w in expired {
w.wake();
}
let mut did_work = false;
loop {
let Some(header) = self.next_task() else {
break;
};
did_work = true;
self.run_task(header);
}
if !did_work {
if self.shutdown.load(Ordering::Acquire) {
self.drain_all_tasks();
break;
}
self.park();
}
}
}
fn run_task(&mut self, header: Arc<TaskHeader>) {
let key = Arc::as_ptr(&header) as usize;
let state = header.state.load(Ordering::Acquire);
if state == STATE_CANCELLED {
let task = self.tasks.lock().unwrap().remove(&key);
if let Some(t) = task {
t.cancel();
}
return;
}
if state == STATE_COMPLETED {
self.tasks.lock().unwrap().remove(&key);
return;
}
let waker = make_waker_with_notifier(
Arc::clone(&header),
Arc::clone(&self.global),
Some(Arc::clone(&self.notifier)),
);
let mut cx = Context::from_waker(&waker);
let task = self.tasks.lock().unwrap().remove(&key);
if let Some(task) = task {
let completed = task.poll_task(&mut cx);
if !completed {
self.tasks.lock().unwrap().insert(key, task);
}
}
}
fn drain_all_tasks(&mut self) {
while let Some(h) = self.local.pop() {
let _ = h; }
{
let mut sq = self.stealable.local_mut();
while sq.pop().is_some() {}
}
}
fn park(&self) {
const MAX_PARK_MS: u64 = 10;
let timeout_ms = match next_timer_deadline() {
None => MAX_PARK_MS,
Some(deadline) => {
let now = std::time::Instant::now();
if deadline <= now {
0
} else {
let ms = deadline.duration_since(now).as_millis() as u64;
ms.min(MAX_PARK_MS)
}
}
};
let mut events = events_with_capacity(64);
let _ = with_reactor_mut(|r| r.poll(&mut events, Some(timeout_ms)));
self.drain_wake_pipe();
#[cfg(unix)]
{
let signal_fired = events.iter().any(|ev| ev.token == SIGNAL_TOKEN && ev.readable);
if signal_fired {
on_signal_readable();
}
}
}
#[cfg(unix)]
fn drain_wake_pipe(&self) {
let mut buf = [0u8; 64];
loop {
let n = unsafe { libc::read(self.wake_rx, buf.as_mut_ptr() as *mut _, buf.len()) };
if n <= 0 {
break;
}
}
}
#[cfg(not(unix))]
fn drain_wake_pipe(&self) {}
}
impl Drop for WorkerThread {
fn drop(&mut self) {
let _ = with_reactor(|r| r.deregister(self.wake_rx));
#[cfg(unix)]
unsafe {
libc::close(self.wake_rx);
libc::close(self.wake_tx);
}
}
}
thread_local! {
pub(crate) static CURRENT_WORKER_WAKE_TX: std::cell::Cell<i32> =
const { std::cell::Cell::new(-1) };
}
pub(crate) fn set_current_worker_wake_tx(fd: i32) {
CURRENT_WORKER_WAKE_TX.with(|c| c.set(fd));
}
pub(crate) fn clear_current_worker_wake_tx() {
CURRENT_WORKER_WAKE_TX.with(|c| c.set(-1));
}