use futures_core::future::{FutureObj, LocalFutureObj};
use futures_executor::enter;
use futures_core::task::{Spawn, SpawnError};
use futures_util::task::{self, ArcWake};
use std::future::Future;
use std::task::{Poll, Context};
use std::pin::Pin;
use std::rc::Rc;
use std::cell::{Cell, RefCell};
use std::sync::Arc;
use std::collections::VecDeque;
use std::cmp;
use webcore::try_from::TryInto;
use webcore::value::Reference;
const INITIAL_QUEUE_CAPACITY: usize = 10;
const QUEUE_SHRINK_DELAY: usize = 10;
pub(crate) type BoxedFuture = LocalFutureObj< 'static, () >;
#[derive(Debug)]
struct TaskInner {
future: BoxedFuture,
executor: EventLoopExecutor,
}
#[derive(Debug)]
struct Task {
is_queued: Cell< bool >,
inner: RefCell< TaskInner >,
}
unsafe impl Send for Task {}
unsafe impl Sync for Task {}
impl Task {
fn new( executor: EventLoopExecutor, future: BoxedFuture ) -> Arc< Self > {
Arc::new( Self {
is_queued: Cell::new( true ),
inner: RefCell::new( TaskInner {
future,
executor,
} ),
} )
}
fn poll( arc: Arc< Self > ) {
let mut lock = arc.inner.borrow_mut();
let lock = &mut *lock;
arc.is_queued.set( false );
let poll = {
let waker = task::waker( arc.clone() );
let cx = &mut Context::from_waker( &waker );
Pin::new( &mut lock.future ).poll( cx )
};
if let Poll::Pending = poll {
if arc.is_queued.get() {
lock.executor.0.push_task( arc.clone() );
}
}
}
#[inline]
fn push_task( arc: &Arc< Self > ) {
if !arc.is_queued.replace( true ) {
if let Ok( lock ) = arc.inner.try_borrow() {
lock.executor.0.push_task( arc.clone() );
}
}
}
}
impl ArcWake for Task {
#[inline]
fn wake_by_ref( arc_self: &Arc< Self > ) {
Task::push_task( arc_self );
}
}
#[derive(Debug)]
struct EventLoopInner {
queue: VecDeque< Arc< Task > >,
past_sum: usize,
past_length: usize,
shrink_counter: usize,
}
#[derive(Debug)]
struct EventLoopQueue {
inner: RefCell< EventLoopInner >,
is_draining: Cell< bool >,
}
impl EventLoopQueue {
fn estimate_realloc_capacity( &self ) -> Option< ( usize, usize ) > {
let mut inner = self.inner.borrow_mut();
let cap = inner.queue.capacity();
inner.past_sum += inner.queue.len();
inner.past_length += 1;
let average = inner.past_sum / inner.past_length;
if average < cap / 4 && cap >= INITIAL_QUEUE_CAPACITY * 2 {
inner.shrink_counter += 1;
if inner.shrink_counter >= QUEUE_SHRINK_DELAY {
inner.shrink_counter = 0;
return Some( ( cap, cmp::max( average * 2, INITIAL_QUEUE_CAPACITY ) ) );
}
} else {
inner.shrink_counter = 0;
}
None
}
fn drain( &self ) {
let _enter = enter().expect( "EventLoopExecutor is already draining" );
if !self.is_draining.replace( true ) {
let maybe_realloc_capacity = self.estimate_realloc_capacity();
loop {
let mut inner = self.inner.borrow_mut();
match inner.queue.pop_front() {
Some( task ) => {
drop( inner );
Task::poll( task );
},
None => {
if let Some( ( old_capacity, realloc_capacity ) ) = maybe_realloc_capacity {
inner.queue = VecDeque::with_capacity( realloc_capacity );
let new_capacity = inner.queue.capacity();
assert!( new_capacity < old_capacity );
assert!( new_capacity < realloc_capacity * 2 );
}
self.is_draining.set( false );
break;
},
}
}
}
}
}
#[derive(Debug)]
struct EventLoop {
queue: Rc< EventLoopQueue >,
waker: Reference,
}
impl EventLoop {
fn queue_microtask( &self ) {
js! { @(no_return) @{&self.waker}(); }
}
fn push_task( &self, task: Arc< Task > ) {
let mut inner = self.queue.inner.borrow_mut();
inner.queue.push_back( task );
if inner.queue.len() == 1 && !self.queue.is_draining.get() {
self.queue_microtask();
}
}
}
impl Drop for EventLoop {
#[inline]
fn drop( &mut self ) {
js! { @(no_return)
@{&self.waker}.drop();
}
}
}
#[derive(Debug, Clone)]
struct EventLoopExecutor( Rc< EventLoop > );
impl EventLoopExecutor {
fn new() -> Self {
let queue = VecDeque::with_capacity( INITIAL_QUEUE_CAPACITY );
assert!( queue.capacity() < INITIAL_QUEUE_CAPACITY * 2 );
let queue = Rc::new( EventLoopQueue {
inner: RefCell::new( EventLoopInner {
queue: queue,
past_sum: 0,
past_length: 0,
shrink_counter: 0,
} ),
is_draining: Cell::new( false ),
} );
let waker = {
let queue = queue.clone();
js!(
var callback = @{move || queue.drain()};
var dropped = false;
function wrapper() {
if ( !dropped ) {
callback();
}
}
var nextTick;
if ( typeof MutationObserver === "function" ) {
var node = document.createTextNode( "0" );
var state = false;
new MutationObserver( wrapper ).observe( node, { characterData: true } );
nextTick = function () {
state = !state;
node.data = ( state ? "1" : "0" );
};
} else {
var promise = Promise.resolve( null );
nextTick = function () {
promise.then( wrapper );
};
}
nextTick.drop = function () {
dropped = true;
callback.drop();
};
return nextTick;
).try_into().unwrap()
};
EventLoopExecutor( Rc::new( EventLoop { queue, waker } ) )
}
#[inline]
fn spawn_local( &self, future: BoxedFuture ) {
self.0.push_task( Task::new( self.clone(), future ) );
}
}
impl Spawn for EventLoopExecutor {
#[inline]
fn spawn_obj( &mut self, future: FutureObj< 'static, () > ) -> Result< (), SpawnError > {
self.spawn_local( future.into() );
Ok( () )
}
}
pub(crate) fn spawn_local( future: BoxedFuture ) {
thread_local! {
static EVENT_LOOP: EventLoopExecutor = EventLoopExecutor::new();
}
EVENT_LOOP.with( |event_loop| event_loop.spawn_local( future ) )
}