radiate_core/
executor.rs

1use crate::thread_pool::{ThreadPool, WaitGroup};
2
3pub enum Executor {
4    Serial,
5    WorkerPool(ThreadPool),
6}
7
8impl Executor {
9    pub fn serial() -> Self {
10        Executor::Serial
11    }
12
13    pub fn worker_pool(num_workers: usize) -> Self {
14        let pool = ThreadPool::new(num_workers);
15        Executor::WorkerPool(pool)
16    }
17
18    pub fn num_workers(&self) -> usize {
19        match self {
20            Executor::Serial => 1,
21            Executor::WorkerPool(pool) => pool.num_workers(),
22        }
23    }
24
25    pub fn execute<F, R>(&self, f: F) -> R
26    where
27        F: FnOnce() -> R + Send + 'static,
28        R: Send + 'static,
29    {
30        match self {
31            Executor::Serial => f(),
32            Executor::WorkerPool(pool) => pool.submit_with_result(f).result(),
33        }
34    }
35
36    pub fn execute_batch<F, R>(&self, f: Vec<F>) -> Vec<R>
37    where
38        F: FnOnce() -> R + Send + 'static,
39        R: Send + 'static,
40    {
41        match self {
42            Executor::Serial => f.into_iter().map(|func| func()).collect(),
43            Executor::WorkerPool(pool) => {
44                let wg = WaitGroup::new();
45                let mut results = Vec::with_capacity(f.len());
46                for job in f {
47                    let wg_clone = wg.guard();
48                    let result = pool.submit_with_result(move || {
49                        let res = job();
50                        drop(wg_clone);
51                        res
52                    });
53                    results.push(result);
54                }
55
56                wg.wait();
57
58                results.into_iter().map(|r| r.result()).collect()
59            }
60        }
61    }
62
63    pub fn submit<F>(&self, f: F)
64    where
65        F: FnOnce() + Send + 'static,
66    {
67        match self {
68            Executor::Serial => f(),
69            Executor::WorkerPool(pool) => pool.submit(f),
70        }
71    }
72
73    pub fn submit_batch<F>(&self, f: Vec<F>)
74    where
75        F: FnOnce() + Send + 'static,
76    {
77        match self {
78            Executor::Serial => {
79                for func in f {
80                    func();
81                }
82            }
83            Executor::WorkerPool(pool) => {
84                let wg = WaitGroup::new();
85                for job in f {
86                    let wg_clone = wg.guard();
87                    pool.submit(move || {
88                        job();
89                        drop(wg_clone);
90                    });
91                }
92                wg.wait();
93            }
94        }
95    }
96}