1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241

use core::{future::Future};


use std::rc::Rc;
#[cfg(feature = "smol_executor")]
use std::sync::Arc;

mod join_handle;
pub use join_handle::*;

mod agnostic_executor;
pub use agnostic_executor::*;

use ExecutorInner::*;
use ExecutorInnerHandle::*;

mod local_agnostic_executor;
pub use local_agnostic_executor::*;


/// It lets you build an AgnosticExecutorManager for a concrete executor
pub struct AgnosticExecutorBuilder {}

impl AgnosticExecutorBuilder {

    /// A manager for a default multi-threaded Tokio executor
    #[cfg(feature = "tokio_executor")]
    pub fn use_tokio_executor(self) -> AgnosticExecutorManager {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .expect("Error creating tokio runtime");
        self.use_tokio_executor_with_runtime(rt)
    }

    /// A manager for a provided Tokio executor
    #[cfg(feature = "tokio_executor")]
    pub fn use_tokio_executor_with_runtime(self, rt: tokio::runtime::Runtime) -> AgnosticExecutorManager {
        let handle = rt.handle().clone();
        AgnosticExecutorManager { 
            inner_handle: TokioHandle(handle),
            inner_runtime: TokioRuntime(rt),
            local_inner_runtime: LocalExecutorInnerRuntime::TokioRuntime(tokio::task::LocalSet::new()),
            local_inner_handle: LocalExecutorInnerHandle::TokioHandle,
            finish_callback: None
        }
    }

    /// A manager for an Async Std executor
    #[cfg(feature = "async_std_executor")]
    pub fn use_async_std_executor(self) -> AgnosticExecutorManager {
        AgnosticExecutorManager { 
            inner_handle: AsyncStdHandle,
            inner_runtime: AsyncStdRuntime,
            local_inner_runtime: LocalExecutorInnerRuntime::AsyncStdRuntime,
            local_inner_handle: LocalExecutorInnerHandle::AsyncStdHandle,
            finish_callback: None
        }
    }

    /// A manager for a Smol executor.
    /// If num_threads is not provided, it defaults default to the number of logical cores.
    #[cfg(feature = "smol_executor")]
    pub fn use_smol_executor(self, num_threads: Option<usize>) -> AgnosticExecutorManager {
        let rt = Arc::new(async_executor::Executor::new());
        let handle = rt.clone();
        let num_threads = num_threads.unwrap_or(num_cpus::get());
        let local = Rc::new(async_executor::LocalExecutor::new());
        AgnosticExecutorManager { 
            inner_handle: SmolHandle(handle),
            inner_runtime: SmolRuntime(rt, num_threads),
            local_inner_runtime: LocalExecutorInnerRuntime::SmolRuntime(local.clone()),
            local_inner_handle: LocalExecutorInnerHandle::SmolHandle(local),
            finish_callback: None
        }
    }

    /// A manager for a default Threadpool executor from the futures crate.
    #[cfg(feature = "futures_executor")]
    pub fn use_futures_executor(self) -> AgnosticExecutorManager {
        let rt = futures::executor::ThreadPool::new().expect("Error creating the futures threadpool");
        self.use_futures_executor_with_runtime(rt)
    }

    /// A manager for a provided executor from the futures crate.
    #[cfg(feature = "futures_executor")]
    pub fn use_futures_executor_with_runtime(self, rt: futures::executor::ThreadPool) -> AgnosticExecutorManager {
        let handle = rt.clone();
        let local = futures::executor::LocalPool::new();
        let local_spawner = local.spawner();
        AgnosticExecutorManager { 
            inner_handle: FuturesHandle(handle),
            inner_runtime: FuturesRuntime(rt),
            local_inner_runtime: LocalExecutorInnerRuntime::FuturesRuntime(local),
            local_inner_handle: LocalExecutorInnerHandle::FuturesHandle(local_spawner),
            finish_callback: None
        }
    }

    /// A manager for a wasm executor from the wasm_bindgen_futures crate
    #[cfg(feature = "wasm_bindgen_executor")]
    pub fn use_wasm_bindgen_executor(self) -> AgnosticExecutorManager {
        AgnosticExecutorManager { 
            inner_handle: WasmBindgenHandle,
            inner_runtime: WasmBindgenRuntime,
            local_inner_runtime: LocalExecutorInnerRuntime::WasmBindgenRuntime,
            local_inner_handle: LocalExecutorInnerHandle::WasmBindgenHandle,
            finish_callback: None
        }
    }
}

use once_cell::sync::OnceCell;
static GLOBAL_EXECUTOR: OnceCell<AgnosticExecutor>  = OnceCell::new();

/// An AgnosticExecutorManager is configured on creation for a specific executor and it allows you get the the general and local executor, set it as global executor, and of course, start the executor.
pub struct AgnosticExecutorManager {
    inner_runtime: ExecutorInner,
    inner_handle: ExecutorInnerHandle,
    local_inner_runtime: LocalExecutorInnerRuntime,
    local_inner_handle: LocalExecutorInnerHandle,
    finish_callback: Option<Box<dyn FnOnce() -> () + 'static>>
}

impl AgnosticExecutorManager {
    /// Get the executor of this manager as an AgnosticExecutor.
    /// This is needed if you need to spawn new tasks, and it be easily stored, cloned and send across threads to have it available where ever you need to spawn a new tasks or interact with the executor. 
    pub fn get_executor(&self) -> AgnosticExecutor {
        AgnosticExecutor { inner: self.inner_handle.clone() }
    }

    /// Get the local executor of this manager as a LocalAgnosticExecutor.
    /// A local executor is similar to the general executor but it allows to spawn tasks that are not send.
    /// The drawback is that, even tough you can store and clone a LocalAgnosticExecutor, you cannot send it to other threads.
    pub fn get_local_executor(&mut self) -> LocalAgnosticExecutor {
        LocalAgnosticExecutor { inner: self.local_inner_handle.clone() }
    }

    /// Sets up a callback to be called when the executor finishes. It can only be called once.
    /// This is needed because on wasm the start() call might finish before it's future completes and we might need a way to detect completion.
    pub fn on_finish<F>(&mut self, cb: F) where F: FnOnce() -> () + 'static {
        self.finish_callback = Some(Box::new(cb));
    }

    /// Sets this executor as the global executor to be used with the global crate functions get_global_executor, spawn and spawn_blocking.
    /// You still need to start the executor after setting it as global.
    /// This can only be called once.
    pub fn set_as_global(&self) {
        GLOBAL_EXECUTOR.set(self.get_executor()).expect("Global executor already set");
    }

    /// Start the executor with the provided future. 
    /// This future doesn't need to be Send, but it needs to be 'static. You can use async move {...} to achieve this if needed.
    /// Note that in wasm the call might finish before the future has completely executed due to the non-blocking nature of the environment, so don't depend on this.
    /// With the other executors you can depend on the fact that start is blocking, but the on_finish callback is called anyway.
    pub fn start<F>(mut self, future: F) where F: Future<Output = ()> + 'static {
        let cb = self.finish_callback.take();
        let finish_cb = || {
            if let Some(cb) = cb {
                cb();
            }
        };

        match (self.inner_runtime, self.local_inner_runtime) {
            #[cfg(feature = "tokio_executor")]
            (TokioRuntime(runtime), LocalExecutorInnerRuntime::TokioRuntime(localset)) => {
                runtime.block_on(async move {
                    localset.run_until(future).await;
                });
                finish_cb();
            },
            #[cfg(feature = "async_std_executor")]
            (AsyncStdRuntime, _) => {
                async_std::task::block_on(future);
                finish_cb();
            },
            #[cfg(feature = "smol_executor")]
            (SmolRuntime(executor, num_threads),  LocalExecutorInnerRuntime::SmolRuntime(local)) => {
                if num_threads > 1 {
                    let (signal, shutdown) = async_channel::unbounded::<()>();
                    easy_parallel::Parallel::new()
                        .each(0..num_threads, |_| futures_lite::future::block_on(executor.run(shutdown.recv())))
                        .finish(|| {
                            futures_lite::future::block_on(async {
                                local.run(future).await;
                                drop(signal);
                            });
                            finish_cb();
                        });
                } else {
                    futures_lite::future::block_on(local.run(future));
                    finish_cb();
                }
            },
            #[cfg(feature = "futures_executor")]
            (FuturesRuntime(_), LocalExecutorInnerRuntime::FuturesRuntime(mut local)) => {
                local.run_until(future);
                finish_cb();
            },
            #[cfg(feature = "wasm_bindgen_executor")]
            (WasmBindgenRuntime, _) => {
                wasm_bindgen_futures::spawn_local(async move {
                    future.await;
                    finish_cb();
                });
            },
            _ => { panic!("Couldn't start the executor."); }
        }
    }
}

/// The base function to create a new concrete AgnosticExecutor.
/// Use the builder to specify the concrete executor, and then the manager to get access the executor and start it.
pub fn new_agnostic_executor() -> AgnosticExecutorBuilder {
    AgnosticExecutorBuilder {}
}

/// Gets the executor set as the global executor.
/// It might panic if no executor is set.
pub fn get_global_executor() -> &'static AgnosticExecutor {
    GLOBAL_EXECUTOR.get().expect("No global executor set")
}

/// Spawn a future on the global executor
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
    where
        F: Future<Output = T> + Send + 'static,
        T: Send + 'static,
{
    get_global_executor().spawn(future)
}

/// Runs the provided closure on the global executor, and when possible, it does it in a way that doesn't block concurrent tasks.
pub fn spawn_blocking<F, T>(task: F) -> JoinHandle<T>
where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,
{
    get_global_executor().spawn_blocking(task)
}