use futures::future::{Future, ExecuteError, Executor};
use futures::executor::{self, Notify, Spawn};
use futures::Async;
use std::collections::VecDeque;
use std::result::Result as StdResult;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::cmp;
use webcore::try_from::TryInto;
use webcore::value::Reference;
const INITIAL_QUEUE_CAPACITY: usize = 10;
const QUEUE_SHRINK_DELAY: usize = 25;
unsafe fn clone_raw< T >( ptr: *const T ) -> Rc< T > {
let result = Rc::from_raw( ptr );
::std::mem::forget( result.clone() );
result
}
type BoxedFuture = Box< Future< Item = (), Error = () > + 'static >;
struct SpawnedTask {
is_queued: Cell< bool >,
spawn: RefCell< Option< Spawn< BoxedFuture > > >,
}
impl SpawnedTask {
fn new< F >( future: F ) -> Rc< Self >
where F: Future< Item = (), Error = () > + 'static {
Rc::new( Self {
is_queued: Cell::new( false ),
spawn: RefCell::new( Some( executor::spawn(
Box::new( future ) as BoxedFuture
) ) )
} )
}
fn poll( &self ) {
let mut spawn = self.spawn.borrow_mut();
if let Some( mut spawn_future ) = spawn.take() {
self.is_queued.set( false );
if spawn_future.poll_future_notify( &&EventLoop, self as *const _ as usize ) == Ok( Async::NotReady ) {
*spawn = Some( spawn_future );
}
}
}
fn notify( task: Rc< SpawnedTask > ) {
if !task.is_queued.replace( true ) {
EventLoop.push_task(task);
}
}
}
struct EventLoop;
thread_local! {
static EVENT_LOOP_INNER: EventLoopInner = EventLoopInner::new();
}
impl EventLoop {
fn drain(&self) {
EVENT_LOOP_INNER.with(EventLoopInner::drain)
}
fn push_task(&self, task: Rc< SpawnedTask >) {
EVENT_LOOP_INNER.with(|inner| inner.push_task(task))
}
}
struct EventLoopInner {
microtask_queue: RefCell< VecDeque< Rc< SpawnedTask > > >,
waker: Reference,
shrink_counter: Cell<usize>
}
impl Drop for EventLoopInner {
#[inline]
fn drop( &mut self ) {
js! { @(no_return)
@{&self.waker}.drop();
}
}
}
impl EventLoopInner {
fn new() -> Self {
EventLoopInner {
microtask_queue: RefCell::new(VecDeque::with_capacity(INITIAL_QUEUE_CAPACITY)),
waker: js!(
var callback = @{|| EventLoop.drain()};
var wrapper = function() {
if (!callback.dropped) { callback() }
};
var nextTick;
if ( typeof MutationObserver === "function" ) {
var node = document.createTextNode( "0" );
var state = false;
new MutationObserver( wrapper ).observe( node, { characterData: true } );
nextTick = function() {
state = !state;
node.data = ( state ? "1" : "0" );
};
} else {
var promise = Promise.resolve( null );
nextTick = function() {
promise.then( wrapper );
};
}
nextTick.drop = function() {
callback.dropped = true;
callback.drop();
};
return nextTick;
).try_into().unwrap(),
shrink_counter: Cell::new(0)
}
}
fn push_task(&self, task: Rc< SpawnedTask >) {
let mut queue = self.microtask_queue.borrow_mut();
queue.push_back(task);
if queue.len() == 1 {
self.wake();
}
}
fn wake(&self) {
js! { @(no_return) @{&self.waker}(); }
}
fn pop_task(&self) -> Option< Rc< SpawnedTask > > {
self.microtask_queue.borrow_mut().pop_front()
}
fn estimate_realloc_capacity(&self) -> Option<usize> {
let queue = self.microtask_queue.borrow();
let half_cap = queue.capacity()/2;
if half_cap > queue.len() && half_cap > INITIAL_QUEUE_CAPACITY {
let shrink_counter = self.shrink_counter.get();
if shrink_counter < QUEUE_SHRINK_DELAY {
self.shrink_counter.set(shrink_counter + 1);
} else {
self.shrink_counter.set(0);
return Some(cmp::max(queue.len(), INITIAL_QUEUE_CAPACITY));
}
} else {
self.shrink_counter.set(0);
}
None
}
fn drain(&self) {
let maybe_realloc_capacity = self.estimate_realloc_capacity();
while let Some(task) = self.pop_task() {
task.poll();
}
if let Some(realloc_capacity) = maybe_realloc_capacity {
*self.microtask_queue.borrow_mut() = VecDeque::with_capacity(realloc_capacity);
}
}
}
impl< F > Executor< F > for EventLoop where
F: Future< Item = (), Error = () > + 'static {
fn execute( &self, future: F ) -> StdResult< (), ExecuteError< F > > {
SpawnedTask::notify( SpawnedTask::new( future ) );
Ok( () )
}
}
impl Notify for EventLoop {
fn notify( &self, spawned_id: usize ) {
SpawnedTask::notify( unsafe { clone_raw( spawned_id as *const _ ) } );
}
fn clone_id( &self, id: usize ) -> usize {
unsafe { Rc::into_raw( clone_raw( id as *const SpawnedTask ) ) as usize }
}
fn drop_id( &self, id: usize ) {
unsafe { Rc::from_raw( id as *const SpawnedTask ) };
}
}
#[inline]
pub fn spawn< F >( future: F ) where
F: Future< Item = (), Error = () > + 'static {
EventLoop.execute( future ).unwrap();
}