1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
//!Runtime module, contains minimalistic runtimes

use core::{task};
use core::pin::Pin;
use core::future::Future;
use core::sync::atomic::{self, AtomicBool};

///Runs future with tokio IO driver enabled in single thread.
///
///## Implementation details
///
///- It setups reactor
///- It doesn't support `tokio-timer`. If you need timers, use my OS timers [crate](https://crates.io/crates/async-timer)
///
///## Usage
///
///```rust
///#![feature(async_await)]
///
///async fn connect() {
///    use std::net::ToSocketAddrs;
///    let addr = "google.com:80".to_socket_addrs().unwrap().next().unwrap();
///    println!("Connecting...");
///    match tokio_net::tcp::TcpStream::connect(&addr).await {
///        Ok(_) => {
///            println!("Successfully connected");
///        },
///        Err(error) => {
///            eprintln!("Failed to connect: {}", error);
///        }
///    };
///}
///
///cute_async::runtime::tokio(connect());
///```
pub fn tokio<F: Future>(mut work: F) -> F::Output {
    let mut reactor = tokio_net::driver::Reactor::new().expect("To create tokio reactor");
    let handle = reactor.handle();
    let _guard = tokio_net::driver::set_default(&handle);

    let waker = Waker::new();
    let raw_waker = waker.waker();
    let mut context = task::Context::from_waker(&raw_waker);

    loop {
        match Future::poll(unsafe { Pin::new_unchecked(&mut work) }, &mut context) {
            task::Poll::Ready(result) => break result,
            task::Poll::Pending => while !waker.take_woke_status() {
                match reactor.is_idle() {
                    true => std::thread::yield_now(),
                    false => {
                        reactor.turn(None).expect("To turn reactor");
                    }
                }
            }
        }
    }
}

static VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(vtab::on_clone, vtab::on_wake, vtab::on_wake, vtab::on_drop);

struct Waker {
    notified: AtomicBool,
}

impl Waker {
    pub const fn new() -> Self {
        Self {
            notified: AtomicBool::new(false),
        }
    }

    //We should guarantee that waker will remain available as we store raw pointer to it.
    pub fn waker(&self) -> task::Waker {
        unsafe { task::Waker::from_raw(task::RawWaker::new(self as *const _ as *const (), &VTABLE)) }
    }

    pub fn wake(&self) {
        self.notified.store(true, atomic::Ordering::Release);
    }

    pub fn take_woke_status(&self) -> bool {
        self.notified.swap(false, atomic::Ordering::AcqRel)
    }
}

mod vtab {
    use super::*;

    pub unsafe fn on_clone(data: *const ()) -> task::RawWaker {
        task::RawWaker::new(data, &VTABLE)
    }

    pub unsafe fn on_drop(_data: *const ()) {
    }

    pub unsafe fn on_wake(data: *const ()) {
        debug_assert!(!data.is_null());
        //I know, know, alignment and shit.
        //But who cares, pointer will be valid
        #[cfg_attr(feature = "cargo-clippy", allow(clippy::cast_ptr_alignment))]
        let waker = data as *const Waker;
        (*waker).wake();
    }
}