use std::{sync::Arc, thread};
use core_affinity::CoreId;
use crossbeam::atomic::AtomicCell;
#[cfg(feature = "crossbeam_channel")]
pub use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError};
#[cfg(not(feature = "crossbeam_channel"))]
pub use std::sync::mpsc::{
channel as unbounded, sync_channel as bounded, Receiver, TryRecvError,
};
#[cfg(all(not(feature = "crossbeam_channel"), not(feature = "thread_task_bounded")))]
pub use std::sync::mpsc::Sender;
#[cfg(all(not(feature = "crossbeam_channel"), feature = "thread_task_bounded"))]
pub use std::sync::mpsc::SyncSender as Sender;
fn get_default_bounded_capacity() -> usize {
let cpu_count = num_cpus::get();
let capacity = (cpu_count * 100).max(128).min(4096);
capacity
}
pub struct TaskExecutor {
jobs: Sender<Box<dyn FnOnce(&usize) + Send + 'static>>,
_handle: thread::JoinHandle<()>,
pub count: Arc<AtomicCell<i64>>,
core: usize,
}
impl std::fmt::Debug for TaskExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskExecutor")
.field("_handle", &self._handle)
.field("count", &self.count)
.field("core", &self.core)
.finish()
}
}
impl TaskExecutor {
pub fn new(core: CoreId, realtime: i32) -> TaskExecutor {
#[cfg(feature = "thread_task_bounded")]
let (tx, rx) = {
let capacity = get_default_bounded_capacity();
bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity)
};
#[cfg(not(feature = "thread_task_bounded"))]
let (tx, rx) = unbounded::<Box<dyn FnOnce(&usize) + Send + 'static>>();
let count = Arc::new(AtomicCell::new(0_i64));
let task_count = count.clone();
let _handle = thread::spawn(move || {
super::set_core_affinity_and_realtime(core.id, realtime);
let core_id = core.id;
let old_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
let thread = std::thread::current();
let thread_name = thread.name().unwrap_or("unnamed");
let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
s.clone()
} else {
format!(
"Unknown panic payload type: {:?}",
panic_info.payload().type_id()
)
};
let location_info = if let Some(location) = panic_info.location() {
format!(
"file: '{}', line: {}, column: {}",
location.file(),
location.line(),
location.column()
)
} else {
"unknown location".to_string()
};
error!(
"PANIC in TaskExecutor worker thread!\n\
┌─ Thread Info ─────────────────────────────────────┐\n\
│ Thread Name: {}\n\
│ Core ID: {}\n\
│ Thread ID: {:?}\n\
├─ Panic Details ──────────────────────────────────┤\n\
│ Message: {}\n\
│ Location: {}\n\
└──────────────────────────────────────────────────┘",
thread_name,
core_id,
thread.id(),
panic_message,
location_info
);
old_hook(panic_info);
}));
Self::run_worker_loop(rx, task_count, core_id);
});
TaskExecutor {
jobs: tx,
_handle,
count,
core: core.id,
}
}
#[cfg(feature = "thread_task_bounded")]
pub fn new_with_capacity(core: CoreId, capacity: usize, realtime: i32) -> TaskExecutor {
let (tx, rx) = bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity);
let count = Arc::new(AtomicCell::new(0_i64));
let task_count = count.clone();
let _handle = thread::spawn(move || {
super::set_core_affinity_and_realtime(core.id, realtime);
let core_id = core.id;
let old_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
let thread = std::thread::current();
let thread_name = thread.name().unwrap_or("unnamed");
let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
s.clone()
} else {
format!(
"Unknown panic payload type: {:?}",
panic_info.payload().type_id()
)
};
let location_info = if let Some(location) = panic_info.location() {
format!(
"file: '{}', line: {}, column: {}",
location.file(),
location.line(),
location.column()
)
} else {
"unknown location".to_string()
};
error!(
"PANIC in TaskExecutor worker thread!\n\
┌─ Thread Info ─────────────────────────────────────┐\n\
│ Thread Name: {}\n\
│ Core ID: {}\n\
│ Thread ID: {:?}\n\
├─ Panic Details ──────────────────────────────────┤\n\
│ Message: {}\n\
│ Location: {}\n\
└──────────────────────────────────────────────────┘",
thread_name,
core_id,
thread.id(),
panic_message,
location_info
);
old_hook(panic_info);
}));
Self::run_worker_loop(rx, task_count, core_id);
});
TaskExecutor {
jobs: tx,
_handle,
count,
core: core.id,
}
}
fn run_worker_loop(
rx: Receiver<Box<dyn FnOnce(&usize) + Send + 'static>>,
task_count: Arc<AtomicCell<i64>>,
core_id: usize,
) {
#[cfg(feature = "thread_dispatch")]
{
let mut empty_count = 0;
loop {
match rx.try_recv() {
Ok(job) => {
job(&core_id);
task_count.fetch_sub(1);
empty_count = 0;
}
Err(TryRecvError::Empty) => {
empty_count += 1;
if empty_count > 1000 {
empty_count = 0;
if let Ok(job) = rx.recv() {
job(&core_id);
task_count.fetch_sub(1);
}
}
}
Err(TryRecvError::Disconnected) => {
error!("TaskExecutor disconnected: {}", core_id);
break;
}
}
}
}
#[cfg(not(feature = "thread_dispatch"))]
loop {
if let Ok(job) = rx.try_recv() {
job(&core_id);
task_count.fetch_sub(1);
}
}
}
#[inline(always)]
pub fn spawn<F>(&self, f: F)
where
F: FnOnce(&usize) + Send + 'static,
{
self.count.fetch_add(1);
if let Err(e) = self.jobs.send(Box::new(f)) {
error!("TaskExecutor send error: {:?}", e);
e.0(&0);
self.count.fetch_sub(1);
}
}
#[cfg(all(feature = "thread_task_bounded", feature = "crossbeam_channel"))]
#[inline(always)]
pub fn try_spawn<F>(&self, f: F) -> bool
where
F: FnOnce(&usize) + Send + 'static,
{
match self.jobs.try_send(Box::new(f)) {
Ok(_) => {
self.count.fetch_add(1);
true
}
Err(_) => false, }
}
#[cfg(all(feature = "thread_task_bounded", not(feature = "crossbeam_channel")))]
#[inline(always)]
pub fn try_spawn<F>(&self, f: F) -> bool
where
F: FnOnce(&usize) + Send + 'static,
{
match self.jobs.try_send(Box::new(f)) {
Ok(_) => {
self.count.fetch_add(1);
true
}
Err(_) => false, }
}
}