use super::*;
use crate::prelude::*;
use crossfire::{
AsyncRx, MAsyncRx, MAsyncTx, MRx, MTx, RecvTimeoutError, Rx, TrySendError, mpmc, mpsc,
};
use std::{sync::Arc, thread, time::Duration};
#[derive(Clone)]
pub struct WorkerPoolBounded<M>
where
M: Send + Sized + Unpin + 'static,
{
tx: MTx<mpmc::Array<M>>,
tx_async: MAsyncTx<mpmc::Array<M>>,
inner: Arc<WorkerPoolInner>,
noti_tx: MTx<mpsc::One<()>>,
auto: bool,
}
impl<W: WorkerAsync> WorkerPoolBuilder<WorkerPoolBounded<W::Msg>, W> {
pub fn new_async<RT>(mut self, bound: usize, rt: Option<RT::Exec>) -> WorkerPoolBounded<W::Msg>
where
RT: AsyncRuntime,
{
let worker = self.worker.take().expect("worker must been set");
let inner = self.build();
let auto = inner.min_workers < inner.max_workers;
let (tx_async, rx) = mpmc::bounded_async::<W::Msg>(bound);
let tx = tx_async.clone().into_blocking();
let (noti_tx, noti_rx) = mpsc::new();
WorkerPoolInner::init_async::<W, RT, _>(&inner, rt.as_ref(), &worker, &rx);
if inner.max_workers > inner.min_workers {
let f = WorkerPoolBounded::<W::Msg>::watcher_async::<W, RT>(
inner.clone(),
worker,
rx,
noti_rx,
);
if let Some(_rt) = rt.as_ref() {
_rt.spawn_detach(f);
} else {
RT::spawn_detach(f);
}
}
WorkerPoolBounded::<W::Msg> { tx, tx_async, inner, noti_tx, auto }
}
}
impl<W: WorkerBlocking> WorkerPoolBuilder<WorkerPoolBounded<W::Msg>, W> {
pub fn new_blocking(mut self, bound: usize) -> WorkerPoolBounded<W::Msg> {
let worker = self.worker.take().expect("worker must been set");
let inner = self.build();
let auto = inner.min_workers < inner.max_workers;
let (tx, rx) = mpmc::bounded_blocking::<W::Msg>(bound);
let tx_async = tx.clone().into_async();
let (noti_tx, noti_rx) = mpsc::new();
WorkerPoolInner::init_blocking::<W, _>(&inner, &worker, &rx);
if inner.max_workers > inner.min_workers {
let _inner = inner.clone();
thread::spawn(move || {
WorkerPoolBounded::<W::Msg>::watcher_blocking::<W>(_inner, worker, rx, noti_rx);
});
}
WorkerPoolBounded::<W::Msg> { tx, tx_async, inner, noti_tx, auto }
}
}
impl<M> WorkerPoolBounded<M>
where
M: Send + Sized + Unpin + 'static,
{
#[inline]
pub fn builder<W>(worker: W, min_workers: usize) -> WorkerPoolBuilder<WorkerPoolBounded<M>, W>
where
W: Worker<Msg = M>,
{
WorkerPoolBuilder {
worker: Some(worker),
min_workers,
max_workers: min_workers,
timeout: ZERO_DUR,
spawn_inv: ZERO_DUR,
_phan: Default::default(),
}
}
#[inline]
pub fn submit(&self, msg: M) {
if self.auto {
if let Err(TrySendError::Full(_msg)) = self.tx.try_send(msg) {
let _ = self.noti_tx.try_send(());
self.tx.send(_msg).expect("send");
}
} else {
self.tx.send(msg).expect("send");
}
}
#[inline]
pub async fn submit_async(&self, msg: M) {
if self.auto {
if let Err(TrySendError::Full(msg)) = self.tx.try_send(msg) {
let _ = self.noti_tx.try_send(());
self.tx_async.send(msg).await.expect("send");
}
} else {
self.tx_async.send(msg).await.expect("send");
}
}
#[inline]
pub fn try_submit(&self, msg: M) -> Result<(), M> {
match self.tx.try_send(msg) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_msg)) => return Err(_msg),
_ => unreachable!(),
}
}
#[inline]
pub fn worker_count(&self) -> usize {
self.inner.worker_count()
}
async fn watcher_async<W, RT>(
inner: Arc<WorkerPoolInner>, worker: W, rx: MAsyncRx<mpmc::Array<M>>,
noti_rx: AsyncRx<mpsc::One<()>>,
) where
W: WorkerAsync<Msg = M>,
RT: AsyncRuntime,
{
loop {
match noti_rx.recv_with_timer(RT::sleep(inner.timeout)).await {
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return,
Ok(_) => {
if inner.worker_count() < inner.max_workers {
inner.clone().spawn_async_worker::<W, RT, _>(
None,
worker.clone(),
true,
rx.clone(),
);
RT::sleep(inner.spawn_inv).await;
}
}
}
}
}
fn watcher_blocking<W>(
inner: Arc<WorkerPoolInner>, worker: W, rx: MRx<mpmc::Array<M>>, noti_rx: Rx<mpsc::One<()>>,
) where
W: WorkerBlocking<Msg = M>,
{
loop {
match noti_rx.recv_timeout(Duration::from_millis(500)) {
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return,
Ok(_) => {
if inner.worker_count() < inner.max_workers {
inner.clone().spawn_blocking_worker::<W, _>(
worker.clone(),
true,
rx.clone(),
);
thread::sleep(Duration::from_millis(500));
}
}
}
}
}
}