use super::job::*;
use super::unsafe_job::*;
use super::scheduler_thread::*;
use std::mem;
use std::fmt;
use std::sync::*;
use std::collections::vec_deque::*;
use num_cpus;
use futures::future;
use futures::sync::oneshot;
use futures::future::Future;
const MIN_THREADS: usize = 8;
lazy_static! {
static ref SCHEDULER: Arc<Scheduler> = Arc::new(Scheduler::new());
}
fn initial_max_threads() -> usize {
MIN_THREADS.max(num_cpus::get()*2)
}
pub struct Scheduler {
schedule: Arc<Mutex<VecDeque<Arc<JobQueue>>>>,
threads: Mutex<Vec<(Arc<Mutex<bool>>, SchedulerThread)>>,
max_threads: Mutex<usize>
}
#[derive(PartialEq, Debug, Clone, Copy)]
enum QueueState {
Idle,
Pending,
Running,
Suspending,
Suspended
}
struct JobQueueCore {
queue: VecDeque<Box<ScheduledJob>>,
state: QueueState,
suspension_count: i32
}
pub struct JobQueue {
core: Mutex<JobQueueCore>
}
impl fmt::Debug for JobQueue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let core = self.core.lock().expect("JobQueue core lock");
fmt.write_str(&format!("JobQueue: State: {:?}, Pending: {}", core.state, core.queue.len()))
}
}
impl JobQueue {
fn new() -> JobQueue {
JobQueue {
core: Mutex::new(JobQueueCore {
queue: VecDeque::new(),
state: QueueState::Idle,
suspension_count: 0
})
}
}
fn dequeue(&self) -> Option<Box<ScheduledJob>> {
let mut core = self.core.lock().expect("JobQueue core lock");
if core.state == QueueState::Suspending {
None
} else {
debug_assert!(core.state == QueueState::Running);
core.queue.pop_front()
}
}
fn drain(&self) {
debug_assert!(self.core.lock().unwrap().state == QueueState::Running);
let mut done = false;
while !done {
while let Some(mut job) = self.dequeue() {
debug_assert!(self.core.lock().unwrap().state == QueueState::Running);
job.run();
}
{
let mut core = self.core.lock().expect("JobQueue core lock");
debug_assert!(core.state == QueueState::Running || core.state == QueueState::Suspending);
if core.queue.len() == 0 {
core.state = match core.state {
QueueState::Running => QueueState::Idle,
QueueState::Suspending => QueueState::Suspended,
x => x
};
done = true;
}
}
}
}
}
impl Scheduler {
pub fn new() -> Scheduler {
let result = Scheduler {
schedule: Arc::new(Mutex::new(VecDeque::new())),
threads: Mutex::new(vec![]),
max_threads: Mutex::new(initial_max_threads())
};
result
}
pub fn set_max_threads(&self, max_threads: usize) {
{ *self.max_threads.lock().expect("Max threads lock") = max_threads };
while self.schedule_thread() {}
}
pub fn despawn_threads_if_overloaded(&self) {
let max_threads = { *self.max_threads.lock().expect("Max threads lock") };
let to_despawn = {
let mut to_despawn = vec![];
let mut threads = self.threads.lock().expect("Scheduler threads lock");
while threads.len() > max_threads {
to_despawn.push(threads.pop().expect("Missing threads").1.despawn());
}
to_despawn
};
to_despawn.into_iter().for_each(|join_handle| { join_handle.join().ok(); });
}
fn next_to_run(schedule: &Arc<Mutex<VecDeque<Arc<JobQueue>>>>) -> Option<Arc<JobQueue>> {
let mut schedule = schedule.lock().expect("Schedule lock");
while let Some(q) = schedule.pop_front() {
let mut core = q.core.lock().expect("JobQueue core lock");
if core.state == QueueState::Pending {
core.state = QueueState::Running;
return Some(q.clone());
}
}
None
}
fn schedule_dormant<NextJob, RunJob, JobData>(&self, next_job: NextJob, job: RunJob) -> bool
where RunJob: 'static+Send+Fn(JobData) -> (), NextJob: 'static+Send+Fn() -> Option<JobData> {
let threads = self.threads.lock().expect("Scheduler threads lock");
for &(ref busy_rc, ref thread) in threads.iter() {
let mut busy = busy_rc.lock().expect("Thread busy lock");
if !*busy {
let also_busy = busy_rc.clone();
*busy = true;
thread.run(Job::new(move || {
let mut done = false;
while !done {
let job_data = {
let mut busy = also_busy.lock().expect("Thread busy lock");
let job_data = next_job();
if job_data.is_none() {
*busy = false;
}
job_data
};
if let Some(job_data) = job_data {
job(job_data);
} else {
done = true;
}
}
}));
return true;
}
}
false
}
fn spawn_thread_if_less_than_maximum(&self) -> bool {
let max_threads = { *self.max_threads.lock().expect("Max threads lock") };
let mut threads = self.threads.lock().expect("Scheduler threads lock");
if threads.len() < max_threads {
let is_busy = Arc::new(Mutex::new(false));
let new_thread = SchedulerThread::new();
threads.push((is_busy, new_thread));
true
} else {
false
}
}
fn schedule_thread(&self) -> bool {
let schedule = self.schedule.clone();
if !self.schedule_dormant(move || Self::next_to_run(&schedule), move |work| work.drain()) {
if self.spawn_thread_if_less_than_maximum() {
self.schedule_thread()
} else {
false
}
} else {
true
}
}
fn reschedule_queue(&self, queue: &Arc<JobQueue>) {
let reschedule = {
let mut core = queue.core.lock().expect("JobQueue core lock");
if core.state == QueueState::Idle {
if core.queue.len() > 0 {
core.state = QueueState::Pending;
true
} else {
core.state = QueueState::Idle;
false
}
} else {
false
}
};
if reschedule {
self.schedule.lock().expect("Schedule lock").push_back(queue.clone());
self.schedule_thread();
}
}
pub fn spawn_thread(&self) {
let is_busy = Arc::new(Mutex::new(false));
let new_thread = SchedulerThread::new();
self.threads.lock().expect("Scheduler threads lock").push((is_busy, new_thread));
}
pub fn create_job_queue(&self) -> Arc<JobQueue> {
let new_queue = Arc::new(JobQueue::new());
new_queue
}
pub fn async<TFn: 'static+Send+FnOnce() -> ()>(&self, queue: &Arc<JobQueue>, job: TFn) {
let schedule_queue = {
let job = Job::new(job);
let mut core = queue.core.lock().expect("JobQueue core lock");
core.queue.push_back(Box::new(job));
if core.state == QueueState::Idle {
core.state = QueueState::Pending;
true
} else {
false
}
};
if schedule_queue {
self.schedule.lock().expect("Schedule lock").push_back(queue.clone());
self.schedule_thread();
}
}
pub fn future<TFn, Item: 'static+Send>(&self, queue: &Arc<JobQueue>, job: TFn) -> Box<Future<Item=Item, Error=oneshot::Canceled>>
where TFn: 'static+Send+FnOnce() -> Item {
let (send, receive) = oneshot::channel();
self.async(queue, move || {
let res = job();
send.send(res).ok();
});
Box::new(receive)
}
pub fn after<'a, TFn, Item: 'static+Send, Error: 'static+Send, Res: 'static+Send, Fut: 'a+Future<Item=Item, Error=Error>>(&self, queue: &Arc<JobQueue>, after: Fut, job: TFn) -> Box<'a+Future<Item=Res, Error=Error>>
where TFn: 'static+Send+FnOnce(Result<Item, Error>) -> Result<Res, Error> {
let after_suspend = self.suspend(queue).map_err(|e| (None, Some(e)));
let after = after.map_err(|e| (Some(e), None))
.join(after_suspend);
let future_queue = queue.clone();
let next_future = after.then(move |val| {
let val = {
match val {
Err((future_err, suspend_err)) => {
if let Some(_suspend_err) = suspend_err {
panic!("While waiting for a future: queue suspension was cancelled");
} else {
Err(future_err.expect("Both errors missing"))
}
},
Ok((val, _)) => Ok(val)
}
};
let result = job(val);
scheduler().resume(&future_queue);
future::result(result)
});
Box::new(next_future)
}
pub fn suspend(&self, queue: &Arc<JobQueue>) -> Box<Future<Item=(), Error=oneshot::Canceled>> {
let (suspended, will_be_suspended) = oneshot::channel();
let to_suspend = queue.clone();
self.async(queue, move || {
let mut core = to_suspend.core.lock().expect("JobQueue core lock");
debug_assert!(core.state == QueueState::Running);
core.suspension_count += 1;
if core.suspension_count == 1 {
core.state = QueueState::Suspending;
}
if core.suspension_count > 0 {
suspended.send(()).ok();
}
});
Box::new(will_be_suspended)
}
pub fn resume(&self, queue: &Arc<JobQueue>) {
let needs_reschedule = {
let mut core = queue.core.lock().expect("JobQueue core lock");
core.suspension_count -= 1;
if core.suspension_count <= 0 {
match core.state {
QueueState::Suspended => {
core.state = QueueState::Idle;
true
},
QueueState::Suspending => {
core.state = QueueState::Running;
false
},
_ => false
}
} else {
false
}
};
if needs_reschedule {
self.reschedule_queue(queue);
}
}
fn sync_immediate<Result, TFn: FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
debug_assert!(queue.core.lock().expect("JobQueue core lock").state == QueueState::Running);
let result = job();
queue.core.lock().expect("JobQueue core lock").state = QueueState::Idle;
self.reschedule_queue(queue);
result
}
fn sync_drain<Result: Send, TFn: Send+FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
debug_assert!(queue.core.lock().expect("JobQueue core lock").state == QueueState::Running);
let result = Arc::new((Mutex::new(None), Condvar::new()));
let queue_result = result.clone();
let result_job = Box::new(Job::new(move || {
let job_result = job();
*queue_result.0.lock().expect("Sync queue result lock") = Some(job_result);
queue_result.1.notify_one();
}));
let unsafe_result_job = UnsafeJob::new(&*result_job);
queue.core.lock().expect("JobQueue core lock").queue.push_back(Box::new(unsafe_result_job));
while result.0.lock().expect("Sync queue result lock").is_none() {
if let Some(mut job) = queue.dequeue() {
debug_assert!(queue.core.lock().unwrap().state != QueueState::Suspended);
job.run();
} else {
let wait_in_background = {
let mut core = queue.core.lock().expect("JobQueue core lock");
if core.state == QueueState::Suspending {
core.state = QueueState::Suspended;
true
} else {
debug_assert!(core.state == QueueState::Running);
false
}
};
if wait_in_background {
while result.0.lock().expect("Sync queue result lock").is_none() {
let parking = &result.1;
let result = result.0.lock().unwrap();
let _result = parking.wait(result).unwrap();
}
}
}
}
queue.core.lock().expect("JobQueue core lock").state = QueueState::Idle;
self.reschedule_queue(queue);
let mut final_result = None;
let mut old_result = result.0.lock().expect("Sync queue result lock");
mem::swap(&mut *old_result, &mut final_result);
final_result.expect("Finished sync request without result")
}
fn sync_background<Result: Send, TFn: Send+FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let pair2 = pair.clone();
let job = Box::new(Job::new(move || {
let &(ref result, ref cvar) = &*pair2;
let actual_result = job();
*result.lock().expect("Background job result lock") = Some(actual_result);
cvar.notify_one();
}));
let need_reschedule = {
let unsafe_job = Box::new(UnsafeJob::new(&*job));
let mut core = queue.core.lock().expect("JobQueue core lock");
core.queue.push_back(unsafe_job);
core.state == QueueState::Idle
};
if need_reschedule { self.reschedule_queue(queue); }
let &(ref lock, ref cvar) = &*pair;
let mut result = lock.lock().expect("Background job result lock");
while result.is_none() {
result = cvar.wait(result).expect("Background job cvar wait");
}
let mut final_result = None;
mem::swap(&mut *result, &mut final_result);
final_result.expect("Finished background sync job without result")
}
pub fn sync<Result: Send, TFn: Send+FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
enum RunAction {
Immediate,
DrainOnThisThread,
WaitForBackground
}
let run_action = {
let mut core = queue.core.lock().expect("JobQueue core lock");
match core.state {
QueueState::Suspended => RunAction::WaitForBackground,
QueueState::Suspending => RunAction::WaitForBackground,
QueueState::Running => RunAction::WaitForBackground,
QueueState::Pending => { core.state = QueueState::Running; RunAction::DrainOnThisThread },
QueueState::Idle => { core.state = QueueState::Running; RunAction::Immediate }
}
};
match run_action {
RunAction::Immediate => self.sync_immediate(queue, job),
RunAction::DrainOnThisThread => self.sync_drain(queue, job),
RunAction::WaitForBackground => self.sync_background(queue, job)
}
}
}
impl fmt::Debug for Scheduler {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let threads = {
let threads = self.threads.lock().expect("Scheduler threads lock");
let busyness:String = threads.iter().map(|&(ref busy, _)| { if *busy.lock().expect("Thread busy lock") { 'B' } else { 'I' } }).collect();
busyness
};
let queue_size = format!("Pending queue count: {}", self.schedule.lock().expect("Schedule lock").len());
fmt.write_str(&format!("{} {}", threads, queue_size))
}
}
pub fn scheduler<'a>() -> &'a Scheduler {
&SCHEDULER
}
pub fn queue() -> Arc<JobQueue> {
scheduler().create_job_queue()
}
pub fn async<TFn: 'static+Send+FnOnce() -> ()>(queue: &Arc<JobQueue>, job: TFn) {
scheduler().async(queue, job)
}
pub fn future<TFn, Item: 'static+Send>(queue: &Arc<JobQueue>, job: TFn) -> Box<Future<Item=Item, Error=oneshot::Canceled>>
where TFn: 'static+Send+FnOnce() -> Item {
scheduler().future(queue, job)
}
pub fn sync<Result: Send, TFn: Send+FnOnce() -> Result>(queue: &Arc<JobQueue>, job: TFn) -> Result {
scheduler().sync(queue, job)
}