#![feature(generators, generator_trait, register_attr)]
#![register_attr(osaka)]
#![feature(termination_trait_lib)]
pub extern crate osaka_macros;
pub extern crate mio;
pub extern crate log;
use std::io::Error;
use std::ops::{Generator, GeneratorState};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use log::{warn, debug};
use std::time::{Duration, Instant};
use std::pin::Pin;
pub use osaka_macros::osaka;
#[macro_export]
macro_rules! sync {
($task:expr) => {{
use osaka::FutureResult;
use osaka::Future;
loop {
match $task.poll() {
FutureResult::Done(y) => {
break y;
}
FutureResult::Again(y) => {
yield y;
}
}
}
};};
}
#[macro_export]
macro_rules! try{
($e:expr) => {
match $e {
Err(e) => return $crate::FutureResult::Done(Err(e.into())),
Ok(v) => v,
}
}
}
#[derive(Clone)]
pub struct Token {
m: mio::Token,
active: Arc<AtomicUsize>,
}
#[derive(Clone)]
pub struct Again {
tokens: Vec<Token>,
deadline: Option<Instant>,
poll: Arc<mio::Poll>,
}
impl Again {
pub fn merge(&mut self, mut other: Again) {
if let Some(mut t2) = other.deadline {
let t = if let Some(ref mut t1) = self.deadline {
if t1 > &mut t2 {
true
} else {
false
}
} else {
true
};
if t {
self.deadline = Some(t2);
}
}
self.tokens.append(&mut other.tokens);
}
}
pub enum FutureResult<F> {
Done(F),
Again(Again),
}
pub trait Future<R> {
fn poll(&mut self) -> FutureResult<R>;
}
impl<R,X> Future<R> for X
where X: Generator<Yield = Again, Return = R> + Unpin
{
fn poll(&mut self) -> FutureResult<R> {
match Pin::new(self).resume() {
GeneratorState::Complete(y) => {
FutureResult::Done(y)
}
GeneratorState::Yielded(a) => {
FutureResult::Again(a)
}
}
}
}
#[derive(Clone)]
pub struct Poll {
tokens: Arc<AtomicUsize>,
poll: Arc<mio::Poll>,
}
impl Poll {
pub fn register<E: ?Sized>(
&self,
handle: &E,
interest: mio::Ready,
opts: mio::PollOpt,
) -> Result<Token, Error>
where
E: mio::Evented,
{
let token = mio::Token(self.tokens.fetch_add(1, Ordering::SeqCst));
self.poll.register(handle, token, interest, opts)?;
Ok(Token{
m: token,
active: Arc::new(AtomicUsize::new(0)),
})
}
pub fn new() -> Self {
Self {
tokens: Arc::new(AtomicUsize::new(0)),
poll: Arc::new(mio::Poll::new().unwrap()),
}
}
pub fn never(&self) -> Again {
Again { poll: self.poll.clone(), tokens: Vec::new(), deadline: None }
}
pub fn later(&self, deadline: Duration) -> Again {
Again { poll: self.poll.clone(), tokens: Vec::new(), deadline: Some(Instant::now() + deadline)}
}
pub fn again(&self, token: Token, deadline: Option<Duration>) -> Again {
Again { poll: self.poll.clone(), tokens: vec![token], deadline: deadline.map(|v|Instant::now() + v) }
}
pub fn any(&self, tokens:Vec<Token>, deadline: Option<Duration>) -> Again {
Again { poll: self.poll.clone(), tokens, deadline: deadline.map(|v|Instant::now() + v) }
}
}
pub enum Task<R> {
Later {
f: Box<Future<R>>,
a: Again,
},
Immediate {
r: Option<R>,
}
}
impl<R> Task<R> {
pub fn run(&mut self) -> R {
loop {
match self {
Task::Immediate{r} => {
return r.take().expect("immediate polled after completion");
}
Task::Later{f,a} => {
let mut events = mio::Events::with_capacity(1024);
let mut timeout = None;
if let Some(deadline) = a.deadline {
let now = Instant::now();
if now > deadline {
log::trace!("deadline already expired. will loop in 1ms");
timeout = Some(Duration::from_millis(1));
} else {
timeout = Some(deadline - now);
}
}
if a.tokens.len() == 0 {
panic!("trying to run() with 0 tokens, this is not going to do anything useful.\n
forgot to pass a token with poll.again() ?");
}
debug!("going to poll with timeout {:?} and {} tokens",
timeout,
a.tokens.len());
a.poll.poll(&mut events, timeout).expect("poll");
for token in &a.tokens {
token.active.store(0, Ordering::SeqCst);
}
for event in &events {
for token in &a.tokens {
if event.token() == token.m {
debug!("token {:?} activated", token.m);
token.active.store(1, Ordering::SeqCst);
}
}
}
}
}
if let FutureResult::Done(v) = self.poll() {
return v;
}
}
}
pub fn new(f: Box<Future<R>>, a: Again) -> Self {
Task::Later{f,a}
}
pub fn wakeup_now(&mut self) {
if let Task::Later{f,a} = self {
a.deadline = Some(Instant::now());
}
}
pub fn immediate(t:R) -> Task<R> {
Task::Immediate{r:Some(t)}
}
}
impl<R> Future<R> for Task<R> {
fn poll(&mut self) -> FutureResult<R> {
match self {
Task::Immediate{r} => {
return FutureResult::Done(r.take().expect("immediate polled after completion"));
}
Task::Later{f,a} => {
let mut ready = false;
if let Some(deadline) = a.deadline {
if Instant::now() >= deadline {
debug!("task wakeup caused by deadline");
a.deadline = None;
ready = true;
}
}
if !ready {
for token in &a.tokens {
if token.active.load(Ordering::SeqCst) > 0 {
debug!("task wakeup caused by token readyness");
ready = true;
break;
}
}
}
if ready {
match f.poll() {
FutureResult::Done(y) => {
return FutureResult::Done(y);
},
FutureResult::Again(a2) => {
*a = a2;
}
}
}
FutureResult::Again(a.clone())
}
}
}
}
impl<E: std::fmt::Debug> std::process::Termination for Task<Result<(), E>> {
fn report(mut self) -> i32 {
match self.run() {
Ok(()) => 0,
Err(e) => {
eprintln!("{:?}", e);
2
}
}
}
}