use std::{thread, time::Duration};
use super::*;
use crate::prelude::*;
use crossfire::{AsyncRx, MAsyncRx, MRx, MTx, RecvTimeoutError, Rx, mpmc, mpsc, null::CloseHandle};
#[derive(Clone)]
pub struct WorkerPoolUnbounded<M>
where
M: Send + Sized + Unpin + 'static,
{
tx: MTx<mpmc::List<M>>,
inner: Arc<WorkerPoolInner>,
_close_h: CloseHandle<mpsc::Null>,
}
impl<W: WorkerAsync> WorkerPoolBuilder<WorkerPoolUnbounded<W::Msg>, W> {
pub fn new_async<RT>(mut self, rt: Option<RT::Exec>) -> WorkerPoolUnbounded<W::Msg>
where
RT: AsyncRuntime,
{
let worker = self.worker.take().expect("worker must been set");
let inner = self.build();
let (tx, rx) = mpmc::unbounded_async::<W::Msg>();
let (close_h, close_rx) = mpsc::new();
WorkerPoolInner::init_async::<W, RT, _>(&inner, rt.as_ref(), &worker, &rx);
if inner.max_workers > inner.min_workers {
let f = WorkerPoolUnbounded::<W::Msg>::watcher_async::<W, RT>(
inner.clone(),
worker,
rx,
close_rx,
);
if let Some(_rt) = rt.as_ref() {
_rt.spawn_detach(f);
} else {
RT::spawn_detach(f);
}
}
WorkerPoolUnbounded::<W::Msg> { tx, inner, _close_h: close_h }
}
}
impl<W: WorkerBlocking> WorkerPoolBuilder<WorkerPoolUnbounded<W::Msg>, W> {
pub fn new_blocking(mut self) -> WorkerPoolUnbounded<W::Msg> {
let worker = self.worker.take().expect("worker must been set");
let inner = self.build();
let (tx, rx) = mpmc::unbounded_blocking::<W::Msg>();
let (close_h, close_rx) = mpsc::new();
WorkerPoolInner::init_blocking::<W, _>(&inner, &worker, &rx);
if inner.max_workers > inner.min_workers {
let _inner = inner.clone();
thread::spawn(move || {
WorkerPoolUnbounded::<W::Msg>::watcher_blocking::<W>(_inner, worker, rx, close_rx);
});
}
WorkerPoolUnbounded::<W::Msg> { tx, inner, _close_h: close_h }
}
}
impl<M> WorkerPoolUnbounded<M>
where
M: Send + Sized + Unpin + 'static,
{
#[inline]
pub fn builder<W>(worker: W, min_workers: usize) -> WorkerPoolBuilder<WorkerPoolUnbounded<M>, W>
where
W: Worker<Msg = M>,
{
WorkerPoolBuilder {
worker: Some(worker),
min_workers,
max_workers: min_workers,
timeout: ZERO_DUR,
spawn_inv: ZERO_DUR,
_phan: Default::default(),
}
}
#[inline(always)]
pub fn submit(&self, msg: M) {
self.tx.send(msg).expect("send");
}
async fn watcher_async<W, RT>(
inner: Arc<WorkerPoolInner>, worker: W, rx: MAsyncRx<mpmc::List<M>>,
close_rx: AsyncRx<mpsc::Null>,
) where
W: WorkerAsync<Msg = M>,
RT: AsyncRuntime,
{
let mut inv = inner.timeout;
if inv > Duration::from_secs(1) {
inv = Duration::from_secs(1);
}
loop {
match close_rx.recv_with_timer(RT::sleep(inv)).await {
Err(RecvTimeoutError::Timeout) => {
let worker_count = inner.worker_count();
if worker_count > inner.max_workers {
continue;
}
if rx.len() > worker_count {
inner.clone().spawn_async_worker::<W, RT, _>(
None,
worker.clone(),
true,
rx.clone(),
);
RT::sleep(inner.spawn_inv).await;
}
}
Err(RecvTimeoutError::Disconnected) => {
return;
}
Ok(_) => unreachable!(),
}
}
}
fn watcher_blocking<W>(
inner: Arc<WorkerPoolInner>, worker: W, rx: MRx<mpmc::List<M>>, close_rx: Rx<mpsc::Null>,
) where
W: WorkerBlocking<Msg = M>,
{
let mut inv = inner.timeout;
if inv > Duration::from_secs(1) {
inv = Duration::from_secs(1);
}
loop {
match close_rx.recv_timeout(inv) {
Err(RecvTimeoutError::Timeout) => {
let worker_count = inner.worker_count();
if worker_count > inner.max_workers {
continue;
}
if rx.len() > worker_count {
inner.clone().spawn_blocking_worker::<W, _>(
worker.clone(),
true,
rx.clone(),
);
thread::sleep(inner.spawn_inv);
}
}
Err(RecvTimeoutError::Disconnected) => {
return;
}
Ok(_) => unreachable!(),
}
}
}
#[inline]
pub fn worker_count(&self) -> usize {
self.inner.worker_count()
}
}