use crate::{
error::MechanicsError,
http::{ReqwestEndpointHttpClient, into_io_error},
job::MechanicsJob,
};
use crossbeam_channel::{
RecvTimeoutError, SendTimeoutError, Sender, TrySendError, bounded, select, tick, unbounded,
};
use serde_json::Value;
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};
use super::{
config::MechanicsPoolConfig, shared::MechanicsPoolShared, worker::{PoolJob, PoolMessage},
};
pub struct MechanicsPool {
pub(crate) shared: Arc<MechanicsPoolShared>,
pub(crate) enqueue_timeout: Duration,
pub(crate) run_timeout: Duration,
pub(crate) supervisor: Option<thread::JoinHandle<()>>,
pub(crate) supervisor_shutdown_tx: Option<Sender<()>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MechanicsPoolStats {
pub is_closed: bool,
pub restart_blocked: bool,
pub desired_workers: usize,
pub known_workers: usize,
pub live_workers: usize,
pub finished_workers_pending_reap: usize,
pub queue_depth: usize,
pub queue_capacity: Option<usize>,
pub restart_attempts_in_window: usize,
pub max_restarts_in_window: usize,
}
impl MechanicsPool {
fn deadline_from_timeout(timeout: Duration) -> Result<Instant, MechanicsError> {
Instant::now().checked_add(timeout).ok_or_else(|| {
MechanicsError::runtime_pool("run_timeout is too large for the current platform clock")
})
}
fn remaining_to_deadline(deadline: Instant) -> Option<Duration> {
let now = Instant::now();
if now >= deadline {
None
} else {
Some(deadline.duration_since(now))
}
}
pub fn stats(&self) -> MechanicsPoolStats {
let (known_workers, finished_workers_pending_reap) = {
let workers = self.shared.workers_read();
let known = workers.len();
let finished = workers.values().filter(|h| h.is_finished()).count();
(known, finished)
};
let (restart_attempts_in_window, max_restarts_in_window) =
self.shared.restart_guard_snapshot();
MechanicsPoolStats {
is_closed: self.shared.is_closed(),
restart_blocked: self.shared.is_restart_blocked(),
desired_workers: self.shared.desired_worker_count(),
known_workers,
live_workers: known_workers.saturating_sub(finished_workers_pending_reap),
finished_workers_pending_reap,
queue_depth: self.shared.queue_depth(),
queue_capacity: self.shared.queue_capacity(),
restart_attempts_in_window,
max_restarts_in_window,
}
}
pub fn new(config: MechanicsPoolConfig) -> Result<Self, MechanicsError> {
config.validate()?;
let endpoint_http_client = if let Some(client) = config.endpoint_http_client.clone() {
client
} else {
let reqwest_client = reqwest::Client::builder()
.build()
.map_err(into_io_error)
.map_err(|e| MechanicsError::runtime_pool(e.to_string()))?;
Arc::new(ReqwestEndpointHttpClient::new(reqwest_client))
};
let (tx, rx) = bounded(config.queue_capacity);
let (exit_tx, exit_rx) = unbounded();
let shared = Arc::new(MechanicsPoolShared::new(
&config,
endpoint_http_client,
tx,
rx,
exit_tx,
exit_rx,
));
for _ in 0..config.worker_count() {
MechanicsPoolShared::spawn_worker(&shared)?;
}
let supervisor_shared = Arc::clone(&shared);
let (supervisor_shutdown_tx, supervisor_shutdown_rx) = bounded::<()>(1);
let reconcile_tick = tick(Self::reconcile_interval(config.restart_window()));
let supervisor = thread::Builder::new()
.name("mechanics-supervisor".to_owned())
.spawn(move || {
loop {
select! {
recv(supervisor_shutdown_rx) -> _ => {
break;
}
recv(supervisor_shared.worker_exit_receiver()) -> event => {
match event {
Ok(event) => {
let maybe_old = {
let mut workers = supervisor_shared.workers_write();
workers.remove(&event.worker_id())
};
if let Some(handle) = maybe_old {
handle.join();
}
}
Err(_) => break,
}
}
recv(reconcile_tick) -> _ => {}
}
if supervisor_shared.is_closed() {
break;
}
MechanicsPoolShared::reconcile_workers(&supervisor_shared);
}
})
.map_err(|e| {
MechanicsError::runtime_pool(format!("failed to spawn supervisor thread: {e}"))
})?;
Ok(Self {
shared,
enqueue_timeout: config.enqueue_timeout(),
run_timeout: config.run_timeout(),
supervisor: Some(supervisor),
supervisor_shutdown_tx: Some(supervisor_shutdown_tx),
})
}
pub(crate) fn reconcile_interval(restart_window: Duration) -> Duration {
let quarter = restart_window.div_f64(4.0);
quarter
.max(Duration::from_millis(50))
.min(Duration::from_millis(500))
}
pub fn run(&self, job: MechanicsJob) -> Result<Value, MechanicsError> {
if self.shared.is_closed() {
return Err(MechanicsError::pool_closed("runtime pool is closed"));
}
if self.shared.is_restart_blocked() && self.shared.live_workers() == 0 {
return Err(MechanicsError::worker_unavailable(
"all workers are unavailable and restart guard is active",
));
}
let deadline = Self::deadline_from_timeout(self.run_timeout)?;
let (reply_tx, reply_rx) = bounded(1);
let canceled = Arc::new(AtomicBool::new(false));
let message = PoolMessage::Run(PoolJob::new(job, reply_tx, Arc::clone(&canceled)));
let Some(remaining_for_enqueue) = Self::remaining_to_deadline(deadline) else {
canceled.store(true, Ordering::Release);
return Err(MechanicsError::run_timeout(
"run timeout elapsed before enqueue",
));
};
let enqueue_wait = self.enqueue_timeout.min(remaining_for_enqueue);
let limited_by_run_timeout = enqueue_wait == remaining_for_enqueue;
match self.shared.job_sender().send_timeout(message, enqueue_wait) {
Ok(()) => {}
Err(SendTimeoutError::Timeout(PoolMessage::Run(pool_job))) => {
if limited_by_run_timeout {
pool_job.mark_canceled();
pool_job.send_result(Err(MechanicsError::run_timeout(
"run timeout elapsed while waiting to enqueue",
)));
return Err(MechanicsError::run_timeout(
"run timeout elapsed while waiting to enqueue",
));
}
pool_job.send_result(Err(MechanicsError::queue_timeout(
"enqueue timed out because queue is full",
)));
return Err(MechanicsError::queue_timeout(
"enqueue timed out because queue is full",
));
}
Err(SendTimeoutError::Disconnected(_)) => {
return Err(MechanicsError::worker_unavailable(
"job queue disconnected from workers",
));
}
}
let Some(remaining_for_reply) = Self::remaining_to_deadline(deadline) else {
canceled.store(true, Ordering::Release);
return Err(MechanicsError::run_timeout(
"run timeout elapsed while waiting for worker reply",
));
};
match reply_rx.recv_timeout(remaining_for_reply) {
Ok(result) => result,
Err(RecvTimeoutError::Timeout) => {
canceled.store(true, Ordering::Release);
Err(MechanicsError::run_timeout(
"run timeout elapsed while waiting for worker reply",
))
}
Err(_) => Err(MechanicsError::worker_unavailable(
"worker dropped reply channel",
)),
}
}
pub fn run_nonblocking_enqueue(&self, job: MechanicsJob) -> Result<Value, MechanicsError> {
if self.shared.is_closed() {
return Err(MechanicsError::pool_closed("runtime pool is closed"));
}
if self.shared.is_restart_blocked() && self.shared.live_workers() == 0 {
return Err(MechanicsError::worker_unavailable(
"all workers are unavailable and restart guard is active",
));
}
let deadline = Self::deadline_from_timeout(self.run_timeout)?;
let (reply_tx, reply_rx) = bounded(1);
let canceled = Arc::new(AtomicBool::new(false));
let message = PoolMessage::Run(PoolJob::new(job, reply_tx, Arc::clone(&canceled)));
match self.shared.job_sender().try_send(message) {
Ok(()) => {}
Err(TrySendError::Full(PoolMessage::Run(_))) => {
return Err(MechanicsError::queue_full("queue is full"));
}
Err(TrySendError::Disconnected(_)) => {
return Err(MechanicsError::worker_unavailable(
"job queue disconnected from workers",
));
}
}
let Some(remaining_for_reply) = Self::remaining_to_deadline(deadline) else {
canceled.store(true, Ordering::Release);
return Err(MechanicsError::run_timeout(
"run timeout elapsed while waiting for worker reply",
));
};
match reply_rx.recv_timeout(remaining_for_reply) {
Ok(result) => result,
Err(RecvTimeoutError::Timeout) => {
canceled.store(true, Ordering::Release);
Err(MechanicsError::run_timeout(
"run timeout elapsed while waiting for worker reply",
))
}
Err(_) => Err(MechanicsError::worker_unavailable(
"worker dropped reply channel",
)),
}
}
}