use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use parking_lot::RwLock;
use rayon::ThreadPool;
use super::config::WorkerPoolConfig;
pub struct AdaptiveWorkerPool {
pub(crate) config: Arc<RwLock<WorkerPoolConfig>>,
rayon_pool: ThreadPool,
pub(crate) semaphore: Arc<Semaphore>,
#[cfg(feature = "memory")]
pub(crate) memory_guard: parking_lot::Mutex<Option<Arc<crate::memory::MemoryGuard>>>,
#[cfg(feature = "scaling")]
pub(crate) scaling_pressure: parking_lot::Mutex<Option<Arc<crate::scaling::ScalingPressure>>>,
}
pub(crate) struct Semaphore {
permits: AtomicUsize,
max_permits: usize,
}
impl Semaphore {
fn new(initial_permits: usize, max_permits: usize) -> Self {
Self {
permits: AtomicUsize::new(initial_permits),
max_permits,
}
}
fn acquire(&self) -> SemaphoreGuard<'_> {
let start = Instant::now();
loop {
let current = self.permits.load(Ordering::Acquire);
if current > 0
&& self
.permits
.compare_exchange_weak(
current,
current - 1,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
return SemaphoreGuard {
semaphore: self,
wait_duration: start.elapsed(),
};
}
std::thread::yield_now();
}
}
pub(crate) fn set_permits(&self, count: usize) {
let clamped = count.min(self.max_permits);
self.permits.store(clamped, Ordering::Release);
}
pub(crate) fn available_permits(&self) -> usize {
self.permits.load(Ordering::Relaxed)
}
}
struct SemaphoreGuard<'a> {
semaphore: &'a Semaphore,
#[allow(dead_code)]
wait_duration: std::time::Duration,
}
impl Drop for SemaphoreGuard<'_> {
fn drop(&mut self) {
self.semaphore.permits.fetch_add(1, Ordering::Release);
}
}
impl AdaptiveWorkerPool {
#[must_use]
pub fn new(config: WorkerPoolConfig) -> Self {
let mut resolved = config;
resolved.resolve_max_threads();
let max_threads = resolved.max_threads;
let min_threads = resolved.min_threads;
let rayon_pool = rayon::ThreadPoolBuilder::new()
.num_threads(max_threads)
.thread_name(|i| format!("worker-{i}"))
.build()
.expect("Failed to create rayon thread pool");
let semaphore = Arc::new(Semaphore::new(min_threads, max_threads));
Self {
config: Arc::new(RwLock::new(resolved)),
rayon_pool,
semaphore,
#[cfg(feature = "memory")]
memory_guard: parking_lot::Mutex::new(None),
#[cfg(feature = "scaling")]
scaling_pressure: parking_lot::Mutex::new(None),
}
}
pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
let config = WorkerPoolConfig::from_cascade(key)?;
Ok(Self::new(config))
}
pub fn process_batch<T, R, E, F>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
where
T: Sync,
R: Send,
E: Send,
F: Fn(&T) -> Result<R, E> + Sync,
{
let sem = &self.semaphore;
self.rayon_pool.install(|| {
use rayon::prelude::*;
items
.par_iter()
.map(|item| {
let _permit = sem.acquire();
f(item)
})
.collect()
})
}
pub async fn fan_out_async<T, R, E, F, Fut>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
where
T: Sync + Send,
R: Send + 'static,
E: Send + 'static,
F: Fn(&T) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
{
let concurrency = self.config.read().async_concurrency;
let mut results: Vec<Option<Result<R, E>>> = (0..items.len()).map(|_| None).collect();
for chunk_start in (0..items.len()).step_by(concurrency) {
let chunk_end = (chunk_start + concurrency).min(items.len());
let mut handles = Vec::with_capacity(chunk_end - chunk_start);
for (idx, item) in items
.iter()
.enumerate()
.skip(chunk_start)
.take(chunk_end - chunk_start)
{
let fut = f(item);
handles.push((idx, tokio::spawn(fut)));
}
for (idx, handle) in handles {
match handle.await {
Ok(result) => results[idx] = Some(result),
Err(join_err) => {
tracing::error!(error = %join_err, idx, "fan_out_async task panicked");
}
}
}
}
results.into_iter().flatten().collect()
}
pub fn install<R: Send>(&self, f: impl FnOnce() -> R + Send) -> R {
self.rayon_pool.install(f)
}
pub fn register_metrics(&self, manager: &crate::metrics::MetricsManager) {
let config = self.config.read();
super::metrics::register(manager, &config);
}
pub fn start_scaling_loop(self: &Arc<Self>, cancel: tokio_util::sync::CancellationToken) {
let controller = super::scaler::ScalingController::new(self.clone());
tokio::spawn(controller.run(cancel));
}
#[cfg(feature = "memory")]
pub fn set_memory_guard(&self, guard: Arc<crate::memory::MemoryGuard>) {
*self.memory_guard.lock() = Some(guard);
}
#[cfg(feature = "scaling")]
pub fn set_scaling_pressure(&self, pressure: Arc<crate::scaling::ScalingPressure>) {
*self.scaling_pressure.lock() = Some(pressure);
}
#[must_use]
pub fn active_threads(&self) -> usize {
let cfg = self.config.read();
cfg.max_threads
.saturating_sub(self.semaphore.available_permits())
}
#[must_use]
pub fn max_threads(&self) -> usize {
self.config.read().max_threads
}
}