use super::{PoolBehavior, ChannelToucher,
ChannelToucherMut, RunningTask, ScheduleAlgorithm};
use channel::{Channel, BitAssigner, NotEnoughBits};
use std::sync::Arc;
use std::convert::Into;
use std::thread::JoinHandle;
use std::thread;
use std::ops::Deref;
use std::mem;
use std::any::Any;
use std::time::Duration as StdDuration;
use atomicmonitor::AtomMonitor;
use atomicmonitor::atomic::{Atomic, Ordering};
use atom::Atom;
use futures::{task, Async};
use futures::future::Future;
use futures::executor::{Notify, NotifyHandle};
use monitor::Monitor;
use time::Duration;
use stopwatch::Stopwatch;
pub struct Pool<Behavior: PoolBehavior> {
behavior: Behavior,
lifecycle_state: Atomic<LifecycleState>,
present_field: Arc<AtomMonitor<u64>>,
levels: Vec<Level<Behavior>>,
levels_shutdown: Vec<Level<Behavior>>,
complete_shutdown_mask: u64,
close_counter: AtomMonitor<u64>,
}
pub struct OwnedPool<Behavior: PoolBehavior> {
pub pool: Arc<Pool<Behavior>>,
pub join: PoolJoinHandle,
workers: Vec<JoinHandle<()>>,
}
#[derive(Clone)]
pub struct PoolJoinHandle {
completion: Arc<Monitor<bool>>
}
impl PoolJoinHandle {
pub fn join(&self) {
self.completion.with_lock(|mut guard| {
while !*guard {
guard.wait();
}
})
}
}
struct Level<Behavior: PoolBehavior> {
mask: u64,
channel_index: Atomic<usize>,
channels: Vec<ChannelIdentifier<Behavior::ChannelKey>>
}
#[derive(Copy, Clone)]
struct ChannelIdentifier<Key> {
key: Key,
mask: u64
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum LifecycleState {
Running,
Closed
}
impl<Behavior: PoolBehavior> Deref for Pool<Behavior> {
type Target = Behavior;
fn deref(&self) -> &<Self as Deref>::Target {
&self.behavior
}
}
#[derive(Debug)]
pub enum NewPoolError {
Over64Channels,
InvalidChannelIndex,
WrongNumberTimeslices {
num_levels: usize,
num_timeslices: usize,
},
NegativeTimeSlice {
index: usize,
duration: Duration,
}
}
impl<Behavior: PoolBehavior> OwnedPool<Behavior> {
pub fn new(mut behavior: Behavior) -> Result<Self, NewPoolError> {
let config = behavior.config();
let mut levels = Vec::new();
let mut levels_shutdown = Vec::new();
let mut complete_shutdown_mask = 0;
let present_field = Arc::new(AtomMonitor::new(0));
let mut current_bit = 0;
{
let mut bit_assigner = BitAssigner::new(&present_field, &mut current_bit);
for channel_param_vec in &config.levels {
let mut level = Level {
mask: 0,
channel_index: Atomic::new(0),
channels: Vec::new(),
};
let mut level_shutdown = Level {
mask: 0,
channel_index: Atomic::new(0),
channels: Vec::new(),
};
for channel_params in channel_param_vec {
let bit_from = bit_assigner.current_index();
behavior.touch_channel_mut(channel_params.key, AssignChannelBits(&mut bit_assigner))
.map_err(|_| NewPoolError::Over64Channels)?;
let bit_until = bit_assigner.current_index();
let mut channel_mask: u64 = 0;
for bit in bit_from..bit_until {
channel_mask |= 0x1 << bit;
}
level.mask |= channel_mask;
level.channels.push(ChannelIdentifier {
key: channel_params.key,
mask: channel_mask
});
if channel_params.complete_on_close {
level_shutdown.mask |= channel_mask;
level_shutdown.channels.push(ChannelIdentifier {
key: channel_params.key,
mask: channel_mask,
});
complete_shutdown_mask |= channel_mask;
}
}
levels.push(level);
levels_shutdown.push(level_shutdown);
}
}
match config.schedule {
ScheduleAlgorithm::HighestFirst => (),
ScheduleAlgorithm::RoundRobin(ref time_slices) => {
if time_slices.len() != levels.len() {
return Err(NewPoolError::WrongNumberTimeslices {
num_levels: levels.len(),
num_timeslices: time_slices.len(),
});
}
}
};
let pool = Arc::new(Pool {
behavior,
lifecycle_state: Atomic::new(LifecycleState::Running),
present_field,
levels,
levels_shutdown,
complete_shutdown_mask,
close_counter: AtomMonitor::new(0)
});
let mut workers = Vec::new();
match config.schedule {
ScheduleAlgorithm::HighestFirst => {
for _ in 0..config.threads {
let pool = pool.clone();
let worker = thread::spawn(move || work_highest_first(pool));
workers.push(worker);
}
},
ScheduleAlgorithm::RoundRobin(time_slices) => {
let mut std_time_slices = Vec::new();
for (i, duration) in time_slices.into_iter().enumerate() {
match duration.to_std() {
Ok(std_duration) => {
std_time_slices.push(std_duration);
},
Err(_) => {
return Err(NewPoolError::NegativeTimeSlice {
index: i,
duration,
});
}
};
}
let pool = pool.clone();
let worker = thread::spawn(move || work_round_robin(pool, std_time_slices));
workers.push(worker);
}
};
Ok(OwnedPool {
pool,
join: PoolJoinHandle {
completion: Arc::new(Monitor::new(false))
},
workers,
})
}
#[must_use]
pub fn close(self) -> PoolClose {
self.pool.lifecycle_state.compare_exchange(
LifecycleState::Running,
LifecycleState::Closed,
Ordering::SeqCst, Ordering::SeqCst
).expect("Illegal lifecycle state");
self.pool.present_field.notify_all();
PoolClose {
handles: Some(self.workers),
completed: Arc::new(Atom::empty()),
join: self.join.clone(),
}
}
}
pub struct PoolClose {
handles: Option<Vec<JoinHandle<()>>>,
completed: Arc<Atom<Box<Result<(), Box<dyn Any + Send + 'static>>>>>,
join: PoolJoinHandle,
}
impl Future for PoolClose {
type Item = ();
type Error = Box<dyn Any + Send + 'static>;
fn poll(&mut self) -> Result<Async<<Self as Future>::Item>, <Self as Future>::Error> {
if let Some(result) = self.completed.take() {
self.join.completion.with_lock(|mut guard| {
*guard = true;
guard.notify_all();
});
result.map(|()| Async::Ready(()))
} else if self.handles.is_some() {
let handles = self.handles.take().unwrap();
let completed = self.completed.clone();
let task = task::current();
thread::spawn(move || {
for handle in handles {
match handle.join() {
Ok(()) => (),
Err(e) => {
completed.set_if_none(Box::new(Err(e)));
task.notify();
return;
}
};
completed.set_if_none(Box::new(Ok(())));
task.notify();
}
});
Ok(Async::NotReady)
} else {
Ok(Async::NotReady)
}
}
}
fn work_highest_first<Behavior: PoolBehavior>(pool: Arc<Pool<Behavior>>) {
'work: while pool.lifecycle_state.load(Ordering::Acquire) == LifecycleState::Running {
pool.present_field.wait_until(|field| {
field != 0x0 ||
pool.lifecycle_state.load(Ordering::Acquire) != LifecycleState::Running
});
let present_field_capture = pool.present_field.get();
if let Some(level) = pool.levels.iter()
.find(|level| (level.mask & present_field_capture) != 0x0) {
let (task, from) = 'find_task: loop {
let level_channel_index = level.channel_index.fetch_add(1, Ordering::SeqCst)
% level.channels.len();
let channel_identifier = level.channels[level_channel_index];
if let Some(task) = pool.behavior.touch_channel(channel_identifier.key, PollChannel) {
break 'find_task (task, channel_identifier);
}
if (pool.present_field.get() & level.mask) == 0x0 {
continue 'work;
}
if pool.lifecycle_state.load(Ordering::Acquire) != LifecycleState::Running {
break 'work;
}
};
run::run(&pool, task, from);
}
}
close(pool);
}
fn work_round_robin<Behavior: PoolBehavior>(pool: Arc<Pool<Behavior>>, time_slices: Vec<StdDuration>) {
'work: while pool.lifecycle_state.load(Ordering::Acquire) == LifecycleState::Running {
pool.present_field.wait_until(|field| {
field != 0x0 ||
pool.lifecycle_state.load(Ordering::Acquire) != LifecycleState::Running
});
'levels: for (level_index, level) in pool.levels.iter().enumerate() {
let timer = Stopwatch::start_new();
while
!({
(pool.present_field.get() & level.mask) == 0x0
} || {
timer.elapsed() >= time_slices[level_index]
} || {
pool.lifecycle_state.load(Ordering::Acquire) != LifecycleState::Running
}) {
let remaining_time = Duration::from_std(time_slices[level_index]).unwrap() -
Duration::from_std(timer.elapsed()).unwrap();
pool.present_field.wait_until_timeout(
|mask| mask != 0x0,
remaining_time
);
if pool.lifecycle_state.load(Ordering::Acquire) != LifecycleState::Running {
break 'work;
}
if timer.elapsed() >= time_slices[level_index] ||
(pool.present_field.get() & level.mask) == 0x0 {
continue 'levels;
}
let (task, from) = 'find_task: loop {
let level_channel_index = level.channel_index.fetch_add(1, Ordering::SeqCst)
% level.channels.len();
let channel_identifier = level.channels[level_channel_index];
if let Some(task) = pool.behavior.touch_channel(channel_identifier.key, PollChannel) {
break 'find_task (task, channel_identifier);
}
if (pool.present_field.get() & level.mask) == 0x0 {
continue 'levels;
}
if pool.lifecycle_state.load(Ordering::Acquire) != LifecycleState::Running {
break 'work;
}
};
run::run(&pool, task, from);
}
}
}
close(pool);
}
fn close<Behavior: PoolBehavior>(pool: Arc<Pool<Behavior>>) {
let should_break_close = || {
if (pool.present_field.get() & pool.complete_shutdown_mask) == 0x0 {
pool.present_field.wait_until(|field| {
pool.close_counter.get() == 0 || (field & pool.complete_shutdown_mask) != 0x0
});
(pool.present_field.get() & pool.complete_shutdown_mask) == 0x0
} else {
false
}
};
'close: loop {
let present_field_capture = pool.present_field.get();
if should_break_close() {
break 'close;
}
if let Some(level) = pool.levels_shutdown.iter()
.find(|level| (level.mask & present_field_capture) != 0x0) {
let (task, from) = 'find_task: loop {
let level_channel_index = level.channel_index.fetch_add(1, Ordering::SeqCst)
% level.channels.len();
let channel_identifier = level.channels[level_channel_index];
if let Some(task) = pool.behavior.touch_channel(channel_identifier.key, PollChannel) {
break 'find_task (task, channel_identifier);
}
if should_break_close() {
break 'close;
} else if (pool.present_field.get() & pool.complete_shutdown_mask & level.mask) == 0x0 {
continue 'close;
}
};
run::run(&pool, task, from);
}
};
}
mod run {
use super::*;
pub (super) fn run<Behavior: PoolBehavior>(
pool: &Arc<Pool<Behavior>>,
task: RunningTask,
from: ChannelIdentifier<Behavior::ChannelKey>
) {
unsafe {
let task: *mut RunningTask = Box::into_raw(Box::new(task));
run_helper(pool, task, from)
};
}
unsafe fn run_helper<Behavior: PoolBehavior>(
pool: &Arc<Pool<Behavior>>,
task: *mut RunningTask,
from: ChannelIdentifier<Behavior::ChannelKey>,
) {
let status: Arc<RunStatus> = Arc::new(RunStatus {
responsibility: Atomic::new(ResponsibilityStatus::NotRequestedAndWillBeTakenCareOf),
reinserted: Atomic::new(false),
});
match (*task).spawn.poll_future_notify(&IntoAtomicFollowup {
pool,
from,
status: &status,
task
}, 0) {
Ok(Async::NotReady) => {
match status.responsibility.compare_exchange(
ResponsibilityStatus::NotRequestedAndWillBeTakenCareOf,
ResponsibilityStatus::NotRequestedAndWillNotBeTakenCareOf,
Ordering::SeqCst, Ordering::SeqCst
) {
Ok(_) => {
if (from.mask & pool.complete_shutdown_mask) != 0x0 {
if let Ok(_) = (&*task).close_counted.compare_exchange(
false, true, Ordering::SeqCst, Ordering::SeqCst) {
pool.close_counter.mutate(|counter| {
counter.fetch_add(1, Ordering::SeqCst);
});
}
}
},
Err(ResponsibilityStatus::RequestedAndWillBeTakenCareOf) => {
run_helper(pool, task, from);
},
Err(other) => panic!("Illegal run status: {:?}", other)
};
},
_ => {
if (&*task).close_counted.load(Ordering::Acquire) {
pool.close_counter.mutate(|counter| {
counter.fetch_sub(1, Ordering::SeqCst);
});
if pool.lifecycle_state.load(Ordering::Acquire) == LifecycleState::Closed {
pool.present_field.notify_all();
}
}
mem::drop(Box::from_raw(task));
}
};
}
#[repr(u8)]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum ResponsibilityStatus {
NotRequestedAndWillBeTakenCareOf,
RequestedAndWillBeTakenCareOf,
NotRequestedAndWillNotBeTakenCareOf,
}
struct RunStatus {
responsibility: Atomic<ResponsibilityStatus>,
reinserted: Atomic<bool>,
}
pub struct AtomicFollowup<Behavior: PoolBehavior> {
pool: Arc<Pool<Behavior>>,
from: ChannelIdentifier<Behavior::ChannelKey>,
status: Arc<RunStatus>,
task: *mut RunningTask
}
impl<Behavior: PoolBehavior> Notify for AtomicFollowup<Behavior> {
fn notify(&self, _: usize) {
match self.status.responsibility.compare_exchange(
ResponsibilityStatus::NotRequestedAndWillBeTakenCareOf,
ResponsibilityStatus::RequestedAndWillBeTakenCareOf,
Ordering::SeqCst, Ordering::SeqCst
) {
Err(ResponsibilityStatus::NotRequestedAndWillNotBeTakenCareOf) => unsafe {
if !self.status.reinserted.swap(true, Ordering::SeqCst) {
let task: RunningTask = *Box::from_raw(self.task);
self.pool.behavior.followup(self.from.key, task);
}
},
Ok(ResponsibilityStatus::NotRequestedAndWillBeTakenCareOf) => (),
Err(ResponsibilityStatus::RequestedAndWillBeTakenCareOf) => (),
invalid => panic!("Invalid atomic followup CAS result: {:#?}", invalid)
};
}
}
unsafe impl<Behavior: PoolBehavior> Send for AtomicFollowup<Behavior> {}
unsafe impl<Behavior: PoolBehavior> Sync for AtomicFollowup<Behavior> {}
pub struct IntoAtomicFollowup<'a, 'b, Behavior: PoolBehavior> {
pool: &'a Arc<Pool<Behavior>>,
from: ChannelIdentifier<Behavior::ChannelKey>,
status: &'b Arc<RunStatus>,
task: *mut RunningTask
}
impl<'a, 'b, Behavior: PoolBehavior> Into<NotifyHandle> for IntoAtomicFollowup<'a, 'b, Behavior> {
fn into(self) -> NotifyHandle {
NotifyHandle::from(Arc::new(AtomicFollowup {
pool: self.pool.clone(),
from: self.from,
status: self.status.clone(),
task: self.task
}))
}
}
impl<'a, 'b, Behavior: PoolBehavior> Clone for IntoAtomicFollowup<'a, 'b, Behavior> {
fn clone(&self) -> Self {
IntoAtomicFollowup {
..*self
}
}
}
}
struct AssignChannelBits<'a, 'b: 'a, 'c: 'a>(&'a mut BitAssigner<'b, 'c>);
impl<'a, 'b: 'a, 'c: 'a> ChannelToucherMut<Result<(), NotEnoughBits>> for AssignChannelBits<'a, 'b, 'c> {
fn touch_mut(&mut self, channel: &mut impl Channel) -> Result<(), NotEnoughBits> {
let &mut AssignChannelBits(ref mut assigner) = self;
channel.assign_bits(assigner)
}
}
struct PollChannel;
impl ChannelToucher<Option<RunningTask>> for PollChannel {
fn touch(&mut self, channel: & impl Channel) -> Option<RunningTask> {
channel.poll()
}
}