1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use futures::{future::Future, stream::Stream, sync::mpsc};
use log::warn;
use tokio::executor::{DefaultExecutor, Executor};
use tokio::runtime::TaskExecutor;

use crate::error::*;

#[derive(Clone)]
pub enum ExecutorFlavour {
    Runtime,
    TokioTaskExecutor(TaskExecutor),
}

impl ExecutorFlavour {
    pub fn execute<F>(&self, task: F) -> ReoolResult<()>
    where
        F: Future<Item = (), Error = ()> + Send + 'static,
    {
        match self {
            ExecutorFlavour::Runtime => {
                DefaultExecutor::current()
                    .spawn(Box::new(task))
                    .map_err(|err| {
                        warn!("default executor failed to execute a task: {:?}", err);
                        ReoolError::with_cause(ErrorKind::TaskExecution, err)
                    })
            }
            ExecutorFlavour::TokioTaskExecutor(executor) => {
                executor.spawn(Box::new(task));
                Ok(())
            }
        }
    }

    pub fn spawn_unbounded<S>(&self, stream: S) -> mpsc::SpawnHandle<S::Item, S::Error>
    where
        S: Stream + Send + 'static,
        S::Item: Send,
        S::Error: Send,
    {
        match self {
            ExecutorFlavour::Runtime => mpsc::spawn_unbounded(stream, &DefaultExecutor::current()),
            ExecutorFlavour::TokioTaskExecutor(executor) => mpsc::spawn_unbounded(stream, executor),
        }
    }
}

impl From<TaskExecutor> for ExecutorFlavour {
    fn from(exec: TaskExecutor) -> Self {
        ExecutorFlavour::TokioTaskExecutor(exec)
    }
}

impl From<()> for ExecutorFlavour {
    fn from(_: ()) -> Self {
        ExecutorFlavour::Runtime
    }
}