use futures::future::Either;
use futures::{self, Async, Future, IntoFuture, Poll};
use std::fmt;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
pub use self::schedule::{with_current_context, yield_poll, Context};
pub use self::schedule::{Scheduler, SchedulerHandle, SchedulerId};
use sync::oneshot::{self, Link, Monitor};
mod schedule;
pub type FiberId = usize;
pub type ContextId = (SchedulerId, FiberId);
pub trait Spawn {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>);
fn spawn<F>(&self, fiber: F)
where
F: Future<Item = (), Error = ()> + Send + 'static,
{
self.spawn_boxed(Box::new(fiber));
}
fn spawn_fn<F, T>(&self, f: F)
where
F: FnOnce() -> T + Send + 'static,
T: IntoFuture<Item = (), Error = ()> + Send + 'static,
T::Future: Send,
{
self.spawn(futures::lazy(f))
}
fn spawn_monitor<F, T, E>(&self, f: F) -> Monitor<T, E>
where
F: Future<Item = T, Error = E> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
let (monitored, monitor) = oneshot::monitor();
self.spawn(f.then(move |r| {
monitored.exit(r);
Ok(())
}));
monitor
}
fn spawn_link<F, T, E>(&self, f: F) -> Link<(), (), T, E>
where
F: Future<Item = T, Error = E> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
let (link0, link1) = oneshot::link();
let future = SelectEither::new(f, link1).then(|result| {
match result {
Err(Either::A((result, link1))) => {
link1.exit(Err(result));
}
Ok(Either::A((result, link1))) => {
link1.exit(Ok(result));
}
_ => {
}
}
Ok(())
});
self.spawn(future);
link0
}
fn boxed(self) -> BoxSpawn
where
Self: Sized + Send + 'static,
{
BoxSpawn(Box::new(move |fiber| self.spawn_boxed(fiber)))
}
}
type BoxFn = Box<dyn Fn(Box<dyn Future<Item = (), Error = ()> + Send>) + Send + 'static>;
pub struct BoxSpawn(BoxFn);
impl Spawn for BoxSpawn {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
(self.0)(fiber);
}
fn boxed(self) -> BoxSpawn
where
Self: Sized + Send + 'static,
{
self
}
}
impl fmt::Debug for BoxSpawn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "BoxSpawn(_)")
}
}
#[derive(Debug)]
struct FiberState {
pub fiber_id: FiberId,
task: Task,
parks: usize,
unparks: Arc<AtomicUsize>,
pub in_run_queue: bool,
}
impl FiberState {
pub fn new(fiber_id: FiberId, task: Task) -> Self {
FiberState {
fiber_id,
task,
parks: 0,
unparks: Arc::new(AtomicUsize::new(0)),
in_run_queue: false,
}
}
pub fn run_once(&mut self) -> bool {
if self.parks > 0 && self.unparks.load(atomic::Ordering::SeqCst) > 0 {
self.parks -= 1;
self.unparks.fetch_sub(1, atomic::Ordering::SeqCst);
}
if let Ok(Async::NotReady) = self.task.0.poll() {
false
} else {
true
}
}
pub fn is_runnable(&self) -> bool {
self.parks == 0 || self.unparks.load(atomic::Ordering::SeqCst) > 0
}
pub fn park(
&mut self,
scheduler_id: schedule::SchedulerId,
scheduler: schedule::SchedulerHandle,
) -> Unpark {
self.parks += 1;
Unpark {
fiber_id: self.fiber_id,
unparks: Arc::clone(&self.unparks),
scheduler_id,
scheduler,
}
}
pub fn yield_once(&mut self) {
self.parks += 1;
self.unparks.fetch_add(1, atomic::Ordering::SeqCst);
}
}
#[derive(Debug)]
pub struct Unpark {
fiber_id: FiberId,
unparks: Arc<AtomicUsize>,
scheduler_id: schedule::SchedulerId,
scheduler: schedule::SchedulerHandle,
}
impl Unpark {
pub fn context_id(&self) -> ContextId {
(self.scheduler_id, self.fiber_id)
}
}
impl Drop for Unpark {
fn drop(&mut self) {
let old = self.unparks.fetch_add(1, atomic::Ordering::SeqCst);
if old == 0 {
self.scheduler.wakeup(self.fiber_id);
}
}
}
pub(crate) type FiberFuture = Box<dyn Future<Item = (), Error = ()> + Send>;
pub(crate) struct Task(pub FiberFuture);
impl fmt::Debug for Task {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Task(_)")
}
}
struct SelectEither<A, B>(Option<(A, B)>);
impl<A: Future, B: Future> SelectEither<A, B> {
fn new(a: A, b: B) -> Self {
SelectEither(Some((a, b)))
}
}
impl<A: Future, B: Future> Future for SelectEither<A, B> {
type Item = Either<(A::Item, B), (A, B::Item)>;
type Error = Either<(A::Error, B), (A, B::Error)>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (mut a, mut b) = self.0.take().expect("Cannot poll SelectEither twice");
match a.poll() {
Err(e) => return Err(Either::A((e, b))),
Ok(Async::Ready(v)) => return Ok(Async::Ready(Either::A((v, b)))),
Ok(Async::NotReady) => {}
}
match b.poll() {
Err(e) => return Err(Either::B((a, e))),
Ok(Async::Ready(v)) => return Ok(Async::Ready(Either::B((a, v)))),
Ok(Async::NotReady) => {}
}
self.0 = Some((a, b));
Ok(Async::NotReady)
}
}