use std::io;
use std::sync::{atomic, Arc, Mutex};
use slab::Slab;
use std::os::unix::io::AsRawFd;
use {Listener, PoolHandle, NO_EXIT};
use worker::worker_main;
use poll::{Poll, Token};
pub struct PoolBuilder<L, C, S, R>
where
L: Listener,
{
pub(super) listener: Arc<L>,
pub(super) epoch: Arc<atomic::AtomicUsize>,
pub(super) exit: Arc<atomic::AtomicUsize>,
pub(super) poll: Arc<Poll>,
pub(super) initial: S,
pub(super) adapter: Arc<Fn(L::Connection) -> C + 'static + Send + Sync>,
pub(super) finalizer: Arc<Fn(S) -> R + Send + Sync + 'static>,
thread_name_prefix: String,
}
impl<L> PoolBuilder<L, L::Connection, (), ()>
where
L: Listener,
{
pub fn from(listener: L) -> io::Result<Self> {
let poll = Poll::new()?;
poll.register(&listener, Token(0))?;
Ok(PoolBuilder {
listener: Arc::new(listener),
epoch: Arc::new(atomic::AtomicUsize::new(1)),
poll: Arc::new(poll),
exit: Arc::new(atomic::AtomicUsize::new(NO_EXIT)),
initial: (),
adapter: Arc::new(|c| c),
finalizer: Arc::new(|_| ()),
thread_name_prefix: String::from("pool-"),
})
}
}
impl<L, C> PoolBuilder<L, C, (), ()>
where
L: Listener,
{
pub fn with_state<S>(self, initial: S) -> PoolBuilder<L, C, S, ()>
where
S: Clone + Send + 'static,
{
PoolBuilder {
listener: self.listener,
epoch: self.epoch,
exit: self.exit,
poll: self.poll,
adapter: self.adapter,
thread_name_prefix: self.thread_name_prefix,
initial,
finalizer: Arc::new(|_| ()),
}
}
}
impl<L, C, S, R> PoolBuilder<L, C, S, R>
where
L: Listener,
{
pub fn set_thread_name_prefix(mut self, prefix: &str) -> Self {
self.thread_name_prefix = prefix.to_string();
self
}
pub fn with_adapter<NA, NC>(self, adapter: NA) -> PoolBuilder<L, NC, S, R>
where
NA: Fn(L::Connection) -> NC + 'static + Send + Sync,
NC: AsRawFd + Send + 'static,
{
PoolBuilder {
listener: self.listener,
epoch: self.epoch,
exit: self.exit,
poll: self.poll,
initial: self.initial,
finalizer: self.finalizer,
thread_name_prefix: self.thread_name_prefix,
adapter: Arc::new(adapter),
}
}
pub fn and_return<NF, NR>(self, fin: NF) -> PoolBuilder<L, C, S, NR>
where
NF: Fn(S) -> NR + Send + Sync + 'static,
NR: Send + 'static,
{
PoolBuilder {
listener: self.listener,
epoch: self.epoch,
exit: self.exit,
poll: self.poll,
adapter: self.adapter,
initial: self.initial,
thread_name_prefix: self.thread_name_prefix,
finalizer: Arc::new(fin),
}
}
}
impl<L, C> PoolBuilder<L, C, (), ()>
where
L: Listener + 'static,
C: AsRawFd + Send + 'static,
{
pub fn run_stateless<E>(self, workers: usize, on_ready: E) -> PoolHandle<()>
where
E: Fn(&mut C) -> io::Result<bool> + 'static + Send + Sync,
{
self.run(workers, move |c, _| on_ready(c))
}
}
impl<L, C, S, R> PoolBuilder<L, C, S, R>
where
L: Listener + 'static,
C: AsRawFd + Send + 'static,
S: Clone + Send + 'static,
R: 'static + Send,
{
pub fn run<E>(self, workers: usize, on_ready: E) -> PoolHandle<R>
where
E: Fn(&mut C, &mut S) -> io::Result<bool> + 'static + Send + Sync,
{
let truth = Arc::new(Mutex::new(Slab::new()));
let on_ready = Arc::new(on_ready);
let wrkrs: Vec<_> = (0..workers)
.map(|i| {
worker_main(
&*self.thread_name_prefix,
i,
&self,
Arc::clone(&truth),
Arc::clone(&on_ready),
)
})
.collect();
PoolHandle {
threads: wrkrs,
exit: self.exit,
}
}
}