1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
use futures::{future::Future, stream::Stream, sync::mpsc}; use log::warn; use tokio::executor::{DefaultExecutor, Executor}; use tokio::runtime::TaskExecutor; use crate::error::*; #[derive(Clone)] pub enum ExecutorFlavour { Runtime, TokioTaskExecutor(TaskExecutor), } impl ExecutorFlavour { pub fn execute<F>(&self, task: F) -> ReoolResult<()> where F: Future<Item = (), Error = ()> + Send + 'static, { match self { ExecutorFlavour::Runtime => { DefaultExecutor::current() .spawn(Box::new(task)) .map_err(|err| { warn!("default executor failed to execute a task: {:?}", err); ReoolError::with_cause(ErrorKind::TaskExecution, err) }) } ExecutorFlavour::TokioTaskExecutor(executor) => { executor.spawn(Box::new(task)); Ok(()) } } } pub fn spawn_unbounded<S>(&self, stream: S) -> mpsc::SpawnHandle<S::Item, S::Error> where S: Stream + Send + 'static, S::Item: Send, S::Error: Send, { match self { ExecutorFlavour::Runtime => mpsc::spawn_unbounded(stream, &DefaultExecutor::current()), ExecutorFlavour::TokioTaskExecutor(executor) => mpsc::spawn_unbounded(stream, executor), } } } impl From<TaskExecutor> for ExecutorFlavour { fn from(exec: TaskExecutor) -> Self { ExecutorFlavour::TokioTaskExecutor(exec) } } impl From<()> for ExecutorFlavour { fn from(_: ()) -> Self { ExecutorFlavour::Runtime } }