1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//!Multi threaded runtime module
//!
//!Runtime is stored in static variable.
//!
//!## Example
//!
//!```rust
//!use tokio_global::multi as rt;
//!use tokio_global::AutoRuntime;
//!
//!let _guard = rt::init();
//!//Now we can exeute futures using runtime
//!//When guard goes out of scope though,
//!//we no longer can use it.
//!
//!let result = futures::future::ok::<u32, u32>(5).finish().expect("Ok");
//!assert_eq!(result, 5);
//!
//!```

use futures::{IntoFuture, Future};
use tokio::runtime;

use super::AutoRuntime;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::io;

//Not set yet
const UNINITIALIZED: usize = 0;
//Being set
const INITIALIZING: usize = 1;
//Set
const INITIALIZED: usize = 2;

static GLOBAL_GUARD: AtomicUsize = AtomicUsize::new(UNINITIALIZED);
static mut TOKIO: Option<runtime::Runtime> = None;
const RUNTIME_NOT_AVAIL: &str = "Runtime is not set";

///Tokio runtime guard
///
///Runtime gets terminated as soon as guard goes out of scope
pub struct Runtime {
}

impl Runtime {
    ///Creates new instance with default configuration
    pub fn new() -> io::Result<Self> {
        Self::from_build(runtime::Builder::new().name_prefix("yukikaze"))
    }

    ///Creates new instance from provided builder
    ///
    ///This function must be called prior to any usage of runtime related functionality
    ///
    ///## Panics
    ///
    ///If runtime is already initialized.
    pub fn from_build(builder: &mut runtime::Builder) -> io::Result<Self> {
        let runtime = builder.build()?;
        match GLOBAL_GUARD.compare_and_swap(UNINITIALIZED, INITIALIZING, Ordering::Release) {
            UNINITIALIZED => unsafe {
                TOKIO = Some(runtime);
                GLOBAL_GUARD.store(INITIALIZED, Ordering::SeqCst);
            },
            _ => panic!("Setting tokio runtime twice")
        }

        Ok(Self {
        })
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        match GLOBAL_GUARD.compare_and_swap(INITIALIZED, INITIALIZING, Ordering::Release) {
            INITIALIZED => unsafe {
                match TOKIO.take() {
                    Some(runtime) => {
                        runtime.shutdown_now();
                    },
                    None => unreach!(),
                }

                GLOBAL_GUARD.store(UNINITIALIZED, Ordering::SeqCst);
            },
            _ => panic!("Runtime is not set, but dropping global guard!"),
        }
    }
}

///Initializes new runtime and returns guard that controls its lifetime.
///
///This function must be called prior to any usage of runtime related functionality:
///
///## Panics
///
///If runtime is already initialized.
pub fn init() -> Runtime {
    Runtime::new().expect("To create runtime")
}

///Starts function within tokio runtime that returns future
///and waits for it to finish
///
///## Note
///
///It must not be used within blocking call like [run](fn.run.html)
pub fn run<F, R, IF, I, E>(runner: F) -> Result<R::Item, R::Error>
    where F: FnOnce() -> R,
          R: IntoFuture<Future=IF, Item=I, Error=E>,
          IF: Future<Item=I, Error=E> + Send + 'static,
          I: Send + 'static,
          E: Send + 'static,
{
    block_on(runner().into_future())
}

///Run a future to completion on the Tokio runtime.
///
///This runs the given future on the runtime, blocking until it is complete,
///and yielding its resolved result.
///Any tasks or timers which the future spawns internally will be executed on the runtime.
///
///## Note
///
///It must not be used from within async context
pub fn block_on<F: Send + 'static + Future<Item=I, Error=E>, I: Send + 'static, E: Send + 'static>(future: F) -> Result<F::Item, F::Error> {
    match GLOBAL_GUARD.load(Ordering::Acquire) {
        INITIALIZED => unsafe { match TOKIO.as_mut() {
            Some(runtime) => runtime.block_on(future),
            None => unreach!(),
        }},
        _ => panic!(RUNTIME_NOT_AVAIL)
    }
}

///Spawns future on runtime's event loop.
pub fn spawn<F: Future<Item=(), Error=()> + 'static + Send>(fut: F) {
    match GLOBAL_GUARD.load(Ordering::Acquire) {
        INITIALIZED => unsafe { match TOKIO.as_mut() {
            Some(runtime) => {
                runtime.spawn(fut);
            },
            None => unreach!(),
        }},
        _ => panic!(RUNTIME_NOT_AVAIL)
    }
}

impl<F: Send + 'static + Future> AutoRuntime for F where F::Item: Send + 'static, F::Error: Send + 'static {
    #[inline]
    fn finish(self) -> Result<Self::Item, Self::Error> {
        block_on(self)
    }

    #[inline]
    fn spawn(self) where Self: Future<Item=(), Error=()> {
        spawn(self)
    }
}