use super::core::*;
use super::job::*;
use super::future_job::*;
use super::unsafe_job::*;
use super::scheduler_thread::*;
use super::job_queue::*;
use super::queue_state::*;
use super::active_queue::*;
use super::sync_future::*;
use super::scheduler_future::*;
use super::queue_resumer::*;
use super::try_sync_error::*;
use std::fmt;
use std::sync::*;
use std::collections::vec_deque::*;
use std::result::{Result};
use futures::prelude::*;
use futures::channel::oneshot;
use futures::future::{Future};
#[cfg(not(target_arch = "wasm32"))]
use num_cpus;
#[cfg(not(target_arch = "wasm32"))]
const MIN_THREADS: usize = 8;
lazy_static! {
static ref SCHEDULER: Arc<Scheduler> = Arc::new(Scheduler::new());
}
#[cfg(not(target_arch = "wasm32"))]
fn initial_max_threads() -> usize {
MIN_THREADS.max(num_cpus::get()*2)
}
#[cfg(target_arch = "wasm32")]
fn initial_max_threads() -> usize {
0
}
pub struct Scheduler {
pub (super) core: Arc<SchedulerCore>
}
impl Scheduler {
pub fn new() -> Scheduler {
let core = SchedulerCore {
schedule: Arc::new(Mutex::new(VecDeque::new())),
threads: Mutex::new(vec![]),
max_threads: Mutex::new(initial_max_threads())
};
Scheduler {
core: Arc::new(core)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn set_max_threads(&self, max_threads: usize) {
{ *self.core.max_threads.lock().expect("Max threads lock") = max_threads };
while self.schedule_thread() {}
}
#[cfg(target_arch = "wasm32")]
pub fn set_max_threads(&self, max_threads: usize) {
}
pub fn despawn_threads_if_overloaded(&self) {
let max_threads = { *self.core.max_threads.lock().expect("Max threads lock") };
let to_despawn = {
let mut to_despawn = vec![];
let mut threads = self.core.threads.lock().expect("Scheduler threads lock");
while threads.len() > max_threads {
to_despawn.push(threads.pop().expect("Missing threads").1.despawn());
}
to_despawn
};
to_despawn.into_iter().for_each(|join_handle| { join_handle.join().ok(); });
}
fn schedule_thread(&self) -> bool {
self.core.schedule_thread(Arc::clone(&self.core))
}
fn reschedule_queue(&self, queue: &Arc<JobQueue>) {
self.core.reschedule_queue(queue, Arc::clone(&self.core))
}
pub fn spawn_thread(&self) {
let is_busy = Arc::new(Mutex::new(false));
let new_thread = SchedulerThread::new();
self.core.threads.lock().expect("Scheduler threads lock").push((is_busy, new_thread));
}
pub fn create_job_queue(&self) -> Arc<JobQueue> {
let new_queue = Arc::new(JobQueue::new());
new_queue
}
#[inline]
#[deprecated(since="0.3.0", note="please use `desync` instead")]
pub fn r#async<TFn: 'static+Send+FnOnce() -> ()>(&self, queue: &Arc<JobQueue>, job: TFn) {
self.desync(queue, job)
}
pub fn desync<TFn: 'static+Send+FnOnce() -> ()>(&self, queue: &Arc<JobQueue>, job: TFn) {
self.schedule_job_desync(queue, Box::new(Job::new(job)));
}
fn schedule_job_desync(&self, queue: &Arc<JobQueue>, job: Box<dyn ScheduledJob>) {
enum ScheduleState {
Idle,
Running,
Panicked
}
let schedule_queue = {
let mut core = queue.core.lock().expect("JobQueue core lock");
core.queue.push_back(job);
match core.state {
QueueState::Idle => {
core.state = QueueState::Pending;
ScheduleState::Idle
},
QueueState::Panicked => ScheduleState::Panicked,
_=> {
ScheduleState::Running
}
}
};
match schedule_queue {
ScheduleState::Idle => {
self.core.schedule.lock().expect("Schedule lock").push_back(queue.clone());
self.schedule_thread();
},
ScheduleState::Running => { }
ScheduleState::Panicked => {
panic!("Cannot schedule jobs on a panicked queue");
},
}
}
pub fn future_desync<TFn, TFuture>(&self, queue: &Arc<JobQueue>, job: TFn) -> SchedulerFuture<TFuture::Output>
where TFn: 'static+Send+FnOnce() -> TFuture,
TFuture: 'static+Send+Future,
TFuture::Output: Send {
let (receive, send) = SchedulerFuture::new(queue, Arc::clone(&self.core));
let perform_job = FutureJob::new(move || {
let job = job();
async {
let val = job.await;
send.signal(val);
}
});
self.schedule_job_desync(queue, Box::new(perform_job));
receive
}
pub fn future_sync<'a, TFn, TFuture>(&'a self, queue: &Arc<JobQueue>, job: TFn) -> impl 'a+Future<Output=Result<TFuture::Output, oneshot::Canceled>>+Send
where TFn: 'a+Send+FnOnce() -> TFuture,
TFuture: 'a+Send+Future,
TFuture::Output: Send {
let job = Box::new(job);
let (queue_ready_send, queue_ready_recv) = oneshot::channel();
let (done_send, done_recv) = oneshot::channel();
let (receive, send) = SchedulerFuture::new(queue, Arc::clone(&self.core));
let signal_job = FutureJob::new(move || {
async {
queue_ready_send.send(()).ok();
done_recv.await.ok();
send.signal(());
}
});
self.schedule_job_desync(queue, Box::new(signal_job));
SyncFuture::new(move || job().boxed(), receive, queue_ready_recv, done_send)
}
pub fn after<TFn, Res: 'static+Send, Fut: 'static+Future+Send>(&self, queue: &Arc<JobQueue>, after: Fut, job: TFn) -> impl Future<Output=Result<Res, oneshot::Canceled>>+Send
where TFn: 'static+Send+FnOnce(Fut::Output) -> Res {
let (receive, send) = SchedulerFuture::new(queue, Arc::clone(&self.core));
let perform_job = FutureJob::new(move || { async {
let val = after.await;
let result = job(val);
send.signal(result);
}
});
self.schedule_job_desync(queue, Box::new(perform_job));
receive
}
pub fn suspend(&self, queue: &Arc<JobQueue>) -> impl Future<Output=Result<QueueResumer, oneshot::Canceled>>+Send {
let (finished_suspending, notify_finished_suspending) = SchedulerFuture::new(queue, Arc::clone(&self.core));
self.future_desync(queue, move || {
let (resume, wait_for_resume) = oneshot::channel();
let queue_resumer = QueueResumer { resume };
notify_finished_suspending.signal(queue_resumer);
wait_for_resume
}).detach();
finished_suspending
}
fn sync_immediate<Result, TFn: FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
debug_assert!(queue.core.lock().expect("JobQueue core lock").state.is_running());
let _active = ActiveQueue { queue: &*queue };
let result = job();
queue.core.lock().expect("JobQueue core lock").state = QueueState::Idle;
self.reschedule_queue(queue);
result
}
fn sync_drain<Result: Send, TFn: Send+FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
debug_assert!(queue.core.lock().expect("JobQueue core lock").state.is_running());
let _active = ActiveQueue { queue: &*queue };
let result = Arc::new((Mutex::new(None), Condvar::new()));
let queue_result = result.clone();
let result_job = Box::new(Job::new(move || {
let job_result = job();
*queue_result.0.lock().expect("Sync queue result lock") = Some(job_result);
queue_result.1.notify_one();
}));
let unsafe_result_job = UnsafeJob::new(&*result_job);
queue.core.lock().expect("JobQueue core lock").queue.push_back(Box::new(unsafe_result_job));
while result.0.lock().expect("Sync queue result lock").is_none() {
match JobQueue::run_one_job_now(queue) {
JobStatus::Finished | JobStatus::NoJobsWaiting => { },
}
}
queue.core.lock().expect("JobQueue core lock").state = QueueState::Idle;
self.reschedule_queue(queue);
let mut old_result = result.0.lock().expect("Sync queue result lock");
let final_result = old_result.take();
final_result.expect("Finished sync request without result")
}
fn sync_background<Result: Send, TFn: Send+FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let pair2 = pair.clone();
let job = Box::new(Job::new(move || {
let &(ref result, ref cvar) = &*pair2;
let actual_result = job();
*result.lock().expect("Background job result lock") = Some(actual_result);
cvar.notify_one();
}));
let need_reschedule = {
let unsafe_job = Box::new(UnsafeJob::new(&*job));
let mut core = queue.core.lock().expect("JobQueue core lock");
core.queue.push_back(unsafe_job);
core.state == QueueState::Idle
};
if need_reschedule { self.reschedule_queue(queue); }
let &(ref lock, ref cvar) = &*pair;
let mut result = lock.lock().expect("Background job result lock");
while result.is_none() {
result = cvar.wait(result).expect("Background job cvar wait");
}
let final_result = result.take();
final_result.expect("Finished background sync job without result")
}
pub fn sync<Result: Send, TFn: Send+FnOnce() -> Result>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result {
enum RunAction {
Immediate,
DrainOnThisThread,
WaitForBackground,
Panic
}
let run_action = {
let mut core = queue.core.lock().expect("JobQueue core lock");
match core.state {
QueueState::Running => RunAction::WaitForBackground,
QueueState::WaitingForWake => RunAction::WaitForBackground,
QueueState::WaitingForUnpark => RunAction::WaitForBackground,
QueueState::WaitingForPoll(_) => RunAction::WaitForBackground,
QueueState::AwokenWhileRunning => RunAction::WaitForBackground,
QueueState::Panicked => RunAction::Panic,
QueueState::Pending => { core.state = QueueState::Running; RunAction::DrainOnThisThread },
QueueState::Idle => {
core.state = QueueState::Running;
if core.queue.len() == 0 {
RunAction::Immediate
} else {
RunAction::DrainOnThisThread
}
}
}
};
match run_action {
RunAction::Immediate => self.sync_immediate(queue, job),
RunAction::DrainOnThisThread => self.sync_drain(queue, job),
RunAction::WaitForBackground => self.sync_background(queue, job),
RunAction::Panic => panic!("Cannot schedule new jobs on a panicked queue")
}
}
pub fn try_sync<FnResult: Send, TFn: Send+FnOnce() -> FnResult>(&self, queue: &Arc<JobQueue>, job: TFn) -> Result<FnResult, TrySyncError> {
enum RunAction {
Immediate,
Busy,
Panic
}
let run_action = {
let mut core = queue.core.lock().expect("JobQueue core lock");
match core.state {
QueueState::Running => RunAction::Busy,
QueueState::WaitingForWake => RunAction::Busy,
QueueState::WaitingForUnpark => RunAction::Busy,
QueueState::WaitingForPoll(_) => RunAction::Busy,
QueueState::AwokenWhileRunning => RunAction::Busy,
QueueState::Panicked => RunAction::Panic,
QueueState::Pending => RunAction::Busy,
QueueState::Idle => {
core.state = QueueState::Running;
if core.queue.len() == 0 {
RunAction::Immediate
} else {
RunAction::Busy
}
}
}
};
match run_action {
RunAction::Immediate => Ok(self.sync_immediate(queue, job)),
RunAction::Busy => Err(TrySyncError::Busy),
RunAction::Panic => panic!("Cannot schedule new jobs on a panicked queue")
}
}
pub (crate) fn sync_no_panic<TFn: Send+FnOnce() -> ()>(&self, queue: &Arc<JobQueue>, job: TFn) -> bool {
enum RunAction {
Immediate,
DrainOnThisThread,
WaitForBackground,
Panic
}
let run_action = {
let mut core = queue.core.lock().expect("JobQueue core lock");
match core.state {
QueueState::Running => RunAction::WaitForBackground,
QueueState::WaitingForWake => RunAction::WaitForBackground,
QueueState::WaitingForUnpark => RunAction::WaitForBackground,
QueueState::WaitingForPoll(_) => RunAction::WaitForBackground,
QueueState::AwokenWhileRunning => RunAction::WaitForBackground,
QueueState::Panicked => RunAction::Panic,
QueueState::Pending => { core.state = QueueState::Running; RunAction::DrainOnThisThread },
QueueState::Idle => {
core.state = QueueState::Running;
if core.queue.len() == 0 {
RunAction::Immediate
} else {
RunAction::DrainOnThisThread
}
}
}
};
match run_action {
RunAction::Immediate => { self.sync_immediate(queue, job); false },
RunAction::DrainOnThisThread => { self.sync_drain(queue, job); false },
RunAction::WaitForBackground => { self.sync_background(queue, job); false },
RunAction::Panic => true
}
}
}
impl fmt::Debug for Scheduler {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let threads = {
let threads = self.core.threads.lock().expect("Scheduler threads lock");
let busyness:String = threads.iter().map(|&(ref busy, _)| { if *busy.lock().expect("Thread busy lock") { 'B' } else { 'I' } }).collect();
busyness
};
let queue_size = format!("Pending queue count: {}", self.core.schedule.lock().expect("Schedule lock").len());
fmt.write_str(&format!("{} {}", threads, queue_size))
}
}
pub fn scheduler<'a>() -> &'a Scheduler {
&SCHEDULER
}
pub fn queue() -> Arc<JobQueue> {
scheduler().create_job_queue()
}
#[inline]
#[deprecated(since="0.3.0", note="please use `desync` instead")]
pub fn r#async<TFn: 'static+Send+FnOnce() -> ()>(queue: &Arc<JobQueue>, job: TFn) {
desync(queue, job)
}
pub fn desync<TFn: 'static+Send+FnOnce() -> ()>(queue: &Arc<JobQueue>, job: TFn) {
scheduler().desync(queue, job)
}
pub fn future_desync<TFn, TFuture>(queue: &Arc<JobQueue>, job: TFn) -> SchedulerFuture<TFuture::Output>
where TFn: 'static+Send+FnOnce() -> TFuture,
TFuture: 'static+Send+Future,
TFuture::Output: Send {
scheduler().future_desync(queue, job)
}
pub fn future_sync<'a, TFn, TFuture>(queue: &Arc<JobQueue>, job: TFn) -> impl 'a+Future<Output=Result<TFuture::Output, oneshot::Canceled>>+Send
where TFn: 'a+Send+FnOnce() -> TFuture,
TFuture: 'a+Send+Future,
TFuture::Output: Send {
scheduler().future_sync(queue, job)
}
pub fn sync<Result: Send, TFn: Send+FnOnce() -> Result>(queue: &Arc<JobQueue>, job: TFn) -> Result {
scheduler().sync(queue, job)
}
pub fn try_sync<FnResult: Send, TFn: Send+FnOnce() -> FnResult>(queue: &Arc<JobQueue>, job: TFn) -> Result<FnResult, TrySyncError> {
scheduler().try_sync(queue, job)
}