#![allow(dead_code)]
#![allow(clippy::redundant_pub_crate)]
use crate::error::ExecutorError;
use crate::fatal::{FatalDispatch, FatalSite, guard_or_fatal};
use crossbeam_channel::{Receiver, Sender, bounded};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
enum Job {
Owned(Box<dyn FnOnce() + Send + 'static>),
Borrowed(BorrowedJob),
}
#[allow(unsafe_code)]
pub(crate) struct BorrowedJob(*mut (dyn FnMut() + Send));
impl BorrowedJob {
#[allow(unsafe_code)]
pub(crate) const unsafe fn new(ptr: *mut (dyn FnMut() + Send)) -> Self {
Self(ptr)
}
}
#[allow(unsafe_code)]
unsafe impl Send for BorrowedJob {}
#[derive(Default)]
struct Tracker {
submitted: AtomicUsize,
completed: AtomicUsize,
cv: Condvar,
lock: Mutex<()>,
}
impl Tracker {
fn submit(&self) {
self.submitted.fetch_add(1, Ordering::SeqCst);
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
fn complete(&self) {
self.completed.fetch_add(1, Ordering::SeqCst);
#[allow(clippy::unwrap_used)]
let guard = self.lock.lock().unwrap();
drop(guard); self.cv.notify_all();
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[allow(clippy::significant_drop_tightening)]
fn wait_for_quiescence(&self) {
#[allow(clippy::unwrap_used)]
let mut g = self.lock.lock().unwrap();
while self.submitted.load(Ordering::SeqCst) != self.completed.load(Ordering::SeqCst) {
#[allow(clippy::unwrap_used)]
let next = self.cv.wait(g).unwrap();
g = next;
}
}
}
pub(crate) struct Pool {
mode: PoolMode,
tracker: Arc<Tracker>,
pub(crate) fatal: Arc<FatalDispatch>,
}
enum PoolMode {
Inline,
Threaded {
tx: Sender<Job>,
handles: Vec<JoinHandle<()>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
},
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[allow(clippy::needless_pass_by_value)]
#[allow(unsafe_code)]
fn run_worker(
rx: Receiver<Job>,
tracker: Arc<Tracker>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
fatal: Arc<FatalDispatch>,
) {
while !shutdown.load(Ordering::Acquire) {
match rx.recv() {
Ok(Job::Owned(f)) => {
guard_or_fatal(&fatal, FatalSite::PoolWorker, f);
tracker.complete();
}
Ok(Job::Borrowed(b)) => {
guard_or_fatal(&fatal, FatalSite::PoolWorker, || unsafe {
(*b.0)();
});
tracker.complete();
}
Err(_) => break,
}
}
}
impl Pool {
pub(crate) fn new(
n_workers: usize,
attrs: crate::thread_attrs::ThreadAttributes,
fatal: Arc<FatalDispatch>,
) -> Result<Self, ExecutorError> {
let tracker = Arc::new(Tracker::default());
if n_workers == 0 {
return Ok(Self {
mode: PoolMode::Inline,
tracker,
fatal,
});
}
let (tx, rx): (Sender<Job>, Receiver<Job>) = bounded(n_workers * 4);
let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
let attrs = Arc::new(attrs);
let mut handles = Vec::with_capacity(n_workers);
for i in 0..n_workers {
let rx = rx.clone();
let tracker = Arc::clone(&tracker);
let shutdown = Arc::clone(&shutdown);
let attrs = Arc::clone(&attrs);
let fatal = Arc::clone(&fatal);
let name = {
#[cfg(feature = "thread_attrs")]
{
attrs
.name_prefix
.as_ref()
.map_or_else(|| format!("taktora-pool-{i}"), |p| format!("{p}-{i}"))
}
#[cfg(not(feature = "thread_attrs"))]
{
format!("taktora-pool-{i}")
}
};
let h = thread::Builder::new()
.name(name)
.spawn(move || {
attrs.apply_to_self(i);
run_worker(rx, tracker, shutdown, fatal);
})
.map_err(|e| ExecutorError::Builder(format!("spawn worker: {e}")))?;
handles.push(h);
}
Ok(Self {
mode: PoolMode::Threaded {
tx,
handles,
shutdown,
},
tracker,
fatal,
})
}
#[cfg(test)]
pub(crate) fn new_with_fatal(
n_workers: usize,
fatal: Arc<FatalDispatch>,
) -> Result<Self, ExecutorError> {
Self::new(
n_workers,
crate::thread_attrs::ThreadAttributes::new(),
fatal,
)
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[track_caller]
pub(crate) fn submit<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.tracker.submit();
match &self.mode {
PoolMode::Inline => {
guard_or_fatal(&self.fatal, FatalSite::InlineSubmit, f);
self.tracker.complete();
}
PoolMode::Threaded { tx, .. } => {
#[allow(clippy::expect_used)]
tx.send(Job::Owned(Box::new(f)))
.expect("pool channel closed");
}
}
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[track_caller]
#[allow(unsafe_code)]
pub(crate) unsafe fn submit_borrowed(&self, job: BorrowedJob) {
self.tracker.submit();
match &self.mode {
PoolMode::Inline => {
guard_or_fatal(&self.fatal, FatalSite::InlineSubmit, || unsafe {
(*job.0)();
});
self.tracker.complete();
}
PoolMode::Threaded { tx, .. } => {
#[allow(clippy::expect_used)]
tx.send(Job::Borrowed(job)).expect("pool channel closed");
}
}
}
pub(crate) fn barrier(&self) {
self.tracker.wait_for_quiescence();
}
}
impl Drop for Pool {
fn drop(&mut self) {
if let PoolMode::Threaded {
shutdown,
handles,
tx,
} = &mut self.mode
{
shutdown.store(true, Ordering::Release);
let (closed_tx, _) = bounded::<Job>(0);
let _ = std::mem::replace(tx, closed_tx);
for h in handles.drain(..) {
let _ = h.join();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fatal::FatalDispatch;
use crate::thread_attrs::ThreadAttributes;
use std::sync::atomic::AtomicU32;
fn noop_fatal() -> Arc<FatalDispatch> {
Arc::new(FatalDispatch::new(Arc::new(|_| {})))
}
type Recorder = Arc<Mutex<Vec<(FatalSite, String)>>>;
fn recording_fatal() -> (Recorder, Arc<FatalDispatch>) {
let rec: Recorder = Arc::new(Mutex::new(Vec::new()));
let rec2 = Arc::clone(&rec);
let dispatch = FatalDispatch::with_terminal(Arc::new(|_| {}), move |ctx| {
rec2.lock().unwrap().push((ctx.site, ctx.cause.clone()));
});
(rec, Arc::new(dispatch))
}
#[test]
fn inline_pool_panic_fires_fatal_with_inline_submit_site() {
let (rec, fatal) = recording_fatal();
let pool = Pool::new_with_fatal(0, fatal).unwrap();
pool.submit(|| panic!("synthetic infra panic"));
pool.barrier();
let entries = rec.lock().unwrap().clone();
assert_eq!(entries.len(), 1, "fatal must fire exactly once");
assert_eq!(entries[0].0, FatalSite::InlineSubmit);
assert_eq!(entries[0].1, "synthetic infra panic");
}
#[test]
fn threaded_pool_panic_fires_fatal_with_pool_worker_site() {
let (rec, fatal) = recording_fatal();
let pool = Pool::new_with_fatal(2, fatal).unwrap();
pool.submit(|| panic!("synthetic infra panic"));
pool.barrier();
let entries = rec.lock().unwrap().clone();
assert_eq!(entries.len(), 1, "fatal must fire exactly once");
assert_eq!(entries[0].0, FatalSite::PoolWorker);
assert_eq!(entries[0].1, "synthetic infra panic");
}
#[cfg(unix)]
const ABORT_CHILD_ENV: &str = "TAKTORA_EXECUTOR_ABORT_CHILD";
#[cfg(unix)]
#[test]
fn pool_panic_aborts_process_with_sigabrt() {
use std::os::unix::process::ExitStatusExt;
if std::env::var(ABORT_CHILD_ENV).is_ok() {
let fatal = Arc::new(FatalDispatch::new(Arc::new(|_| {})));
let pool = Pool::new(2, ThreadAttributes::new(), fatal).unwrap();
pool.submit(|| panic!("synthetic infra panic in child"));
pool.barrier();
std::process::exit(0);
}
let poll_interval = std::time::Duration::from_millis(50);
let max_wait = std::time::Duration::from_secs(30);
let exe = std::env::current_exe().expect("current_exe");
let mut child = std::process::Command::new(exe)
.args([
"--exact",
"pool::tests::pool_panic_aborts_process_with_sigabrt",
])
.env(ABORT_CHILD_ENV, "1")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("spawn child test process");
let deadline = std::time::Instant::now() + max_wait;
let status = loop {
if let Some(status) = child.try_wait().expect("try_wait on child test process") {
break status;
}
if std::time::Instant::now() >= deadline {
let _ = child.kill();
panic!("child did not abort within 30s");
}
std::thread::sleep(poll_interval);
};
assert_eq!(
status.signal(),
Some(6),
"child must die via SIGABRT (signal 6); status: {status:?}"
);
}
#[test]
fn inline_pool_runs_synchronously() {
let pool = Pool::new(0, ThreadAttributes::new(), noop_fatal()).unwrap();
let counter = Arc::new(AtomicU32::new(0));
for _ in 0..10 {
let c = Arc::clone(&counter);
pool.submit(move || {
c.fetch_add(1, Ordering::SeqCst);
});
}
pool.barrier();
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
#[test]
fn threaded_pool_runs_concurrently_and_barriers() {
let pool = Pool::new(4, ThreadAttributes::new(), noop_fatal()).unwrap();
let counter = Arc::new(AtomicU32::new(0));
for _ in 0..100 {
let c = Arc::clone(&counter);
pool.submit(move || {
std::thread::sleep(std::time::Duration::from_millis(1));
c.fetch_add(1, Ordering::SeqCst);
});
}
pool.barrier();
assert_eq!(counter.load(Ordering::SeqCst), 100);
}
#[test]
fn barrier_with_no_work_returns_immediately() {
let pool = Pool::new(2, ThreadAttributes::new(), noop_fatal()).unwrap();
pool.barrier();
}
#[test]
fn submitted_panic_fires_fatal_and_completion_is_counted() {
let (rec, fatal) = recording_fatal();
let pool = Pool::new_with_fatal(2, fatal).unwrap();
pool.submit(|| panic!("kaboom"));
pool.submit(|| {});
pool.barrier();
assert_eq!(
pool.tracker.submitted.load(Ordering::SeqCst),
pool.tracker.completed.load(Ordering::SeqCst),
"submitted vs completed counters diverged after panic"
);
let entries = rec.lock().unwrap().clone();
assert_eq!(entries.len(), 1, "exactly one fatal should have fired");
assert_eq!(entries[0].0, FatalSite::PoolWorker);
assert_eq!(entries[0].1, "kaboom");
}
}