#![allow(dead_code)]
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
};
use std::thread;
use std::time::{Duration, SystemTime};
use crate::debug::is_debug_mode;
use crate::manager::{IdleThreshold, StatusBehaviorDefinitions, StatusBehaviors};
use crate::model::*;
use crate::pool::PoolStatus;
use crossbeam_channel as channel;
const TIMEOUT: Duration = Duration::from_micros(16);
const LONG_TIMEOUT: Duration = Duration::from_micros(96);
const LOT_COUNTS: usize = 3;
const LONG_PARKING_ROUNDS: u8 = 8;
const SHORT_PARKING_ROUNDS: u8 = 2;
thread_local!();
pub(crate) struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
stat: Weak<AtomicUsize>,
before_drop: Option<WorkerUpdate>,
after_drop: Option<WorkerUpdate>,
}
struct WorkStatus(i8, Option<Job>);
impl Worker {
pub(crate) fn new(
name: Option<String>,
my_id: usize,
stack_size: usize,
privileged: bool,
rx_pair: (channel::Receiver<Message>, channel::Receiver<Message>),
shared_info: (PoolStatus, IdleThreshold), behavior_definition: &StatusBehaviors,
) -> Worker {
behavior_definition.before_start(my_id);
let (worker, stat) =
Self::spawn_worker(name, my_id, stack_size, privileged, rx_pair, shared_info);
behavior_definition.after_start(my_id);
Worker {
id: my_id,
thread: Some(worker),
stat,
before_drop: behavior_definition.before_drop_clone(),
after_drop: behavior_definition.after_drop_clone(),
}
}
pub(crate) fn get_id(&self) -> usize {
self.id
}
pub(crate) fn retire(&mut self) {
if let Some(handle) = self.thread.take() {
if let Some(stat) = self.stat.upgrade() {
stat.store(1, Ordering::SeqCst);
}
handle.thread().unpark();
handle.join().unwrap_or_else(|err| {
eprintln!("Unable to drop worker: {}, error: {:?}", self.id, err);
});
}
}
pub(crate) fn wake_up(&self) {
if let Some(handle) = self.thread.as_ref() {
handle.thread().unpark();
}
}
pub(crate) fn is_terminated(&self) -> bool {
if let Some(stat) = self.stat.upgrade() {
return stat.load(Ordering::Acquire) == 2usize;
}
false
}
fn spawn_worker(
name: Option<String>,
my_id: usize,
stack_size: usize,
privileged: bool,
rx_pair: (channel::Receiver<Message>, channel::Receiver<Message>),
shared_info: (PoolStatus, IdleThreshold),
) -> (thread::JoinHandle<()>, Weak<AtomicUsize>) {
let mut builder = thread::Builder::new();
if name.is_some() {
builder = builder.name(name.unwrap_or_else(|| format!("worker-{}", my_id)));
}
if stack_size > 0 {
builder = builder.stack_size(stack_size);
}
let worker_stat = Arc::new(AtomicUsize::new(0));
let stat_clone = Arc::downgrade(&worker_stat);
let handle = builder
.spawn(move || {
let mut idle_stat: Option<u8>;
let mut status: u8;
let mut pri_work_count: u8 = 0;
let mut since = if privileged {
None
} else {
Some(SystemTime::now())
};
let (pool_status, idle_threshold) = shared_info;
let (pri_wait, norm_wait) = match my_id % LOT_COUNTS {
0 => (true, false),
1 => (false, true),
_ => (false, false),
};
loop {
if worker_stat.load(Ordering::SeqCst) == 1usize {
return;
}
status = pool_status.load();
if status == FLAG_FORCE_CLOSE
|| ((status == FLAG_CLOSING || status == FLAG_REST)
&& rx_pair.0.is_empty()
&& rx_pair.1.is_empty())
{
worker_stat.store(1, Ordering::SeqCst);
return;
}
let work = match Worker::check_queues(
&rx_pair.0,
&rx_pair.1,
pri_wait,
norm_wait,
&mut pri_work_count,
) {
WorkStatus(-1, _) => {
worker_stat.store(1, Ordering::SeqCst);
return;
}
WorkStatus(_, job) => job,
};
idle_stat =
Worker::handle_work(
work,
&mut since
)
.or_else(|| {
Worker::calc_idle(&since)
})
.and_then(|idle| {
let stat_code = idle_threshold.idle_stat(idle.as_secs() as u64);
if stat_code > 0 {
worker_stat.store(stat_code as usize, Ordering::SeqCst);
return Some(stat_code);
}
None
});
match idle_stat {
Some(1) => {
pool_status.toggle_flag(FLAG_SLEEP_WORKERS, true);
thread::park();
},
Some(2) => return,
_ => {}
}
if status == FLAG_HIBERNATING {
thread::park();
}
}
})
.unwrap();
(handle, stat_clone)
}
fn check_queues(
pri_chan: &channel::Receiver<Message>,
norm_chan: &channel::Receiver<Message>,
pri_wait: bool,
norm_wait: bool,
pri_work_count: &mut u8,
) -> WorkStatus {
if *pri_work_count < 255 {
let norm_full = norm_chan.is_full();
match Worker::fetch_work(pri_chan, norm_full && !pri_wait) {
Ok(message) => {
let (job, _) = Worker::unpack_message(message);
if *pri_work_count < 4 {
*pri_work_count += 1;
} else if norm_full {
*pri_work_count = 255;
}
return WorkStatus(0, job);
}
Err(channel::RecvTimeoutError::Disconnected) => {
return WorkStatus(-1, None);
}
Err(channel::RecvTimeoutError::Timeout) => {
}
};
} else {
*pri_work_count = 0;
}
match Worker::fetch_work(norm_chan, pri_chan.is_full() && !norm_wait) {
Ok(message) => {
let (job, _) = Worker::unpack_message(message);
*pri_work_count = 0;
return WorkStatus(0, job);
}
Err(channel::RecvTimeoutError::Disconnected) => {
return WorkStatus(-1, None);
}
Err(channel::RecvTimeoutError::Timeout) => {
}
};
WorkStatus(0, None)
}
fn fetch_work(
main_chan: &channel::Receiver<Message>,
can_skip: bool,
) -> Result<Message, channel::RecvTimeoutError> {
let mut wait = 0;
let rounds = if can_skip {
SHORT_PARKING_ROUNDS
} else {
LONG_PARKING_ROUNDS
};
loop {
wait += 1;
match main_chan.try_recv() {
Ok(work) => return Ok(work),
Err(channel::TryRecvError::Disconnected) => {
return Err(channel::RecvTimeoutError::Disconnected)
}
Err(channel::TryRecvError::Empty) => {
if can_skip {
return Err(channel::RecvTimeoutError::Timeout);
}
}
}
if wait > rounds {
return Err(channel::RecvTimeoutError::Timeout);
}
}
}
fn handle_work(work: Option<Job>, since: &mut Option<SystemTime>) -> Option<Duration> {
if let Some(w) = work {
w.call_box();
}
let mut idle = None;
if since.is_some() {
idle = Worker::calc_idle(&since);
since.replace(SystemTime::now());
}
idle
}
fn unpack_message(message: Message) -> (Option<Job>, Option<Vec<usize>>) {
match message {
Message::SingleJob(job) => (Some(job), None),
Message::ChainedJobs(_) => unreachable!(),
Message::Terminate(target) => (None, Some(target)),
}
}
fn calc_idle(since: &Option<SystemTime>) -> Option<Duration> {
if let Some(s) = since {
if let Ok(e) = s.elapsed() {
return Some(e);
}
}
None
}
}
impl Drop for Worker {
fn drop(&mut self) {
if let Some(behavior) = self.before_drop {
behavior(self.id);
}
if is_debug_mode() {
println!("Dropping worker {}", self.id);
}
self.retire();
if let Some(behavior) = self.after_drop {
behavior(self.id);
}
}
}