1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
use crate::{Error, Result}; use crossbeam_channel::{Receiver, Sender}; use parking_lot::Mutex; use std::{ sync::Arc, thread::{Builder as ThreadBuilder, JoinHandle}, }; pub trait Executor: std::fmt::Debug + Send + Sync { fn execute(&self, f: Box<dyn FnOnce() + Send>) -> Result<()>; } #[derive(Clone, Debug)] pub struct DefaultExecutor { sender: Sender<Box<dyn FnOnce() + Send>>, receiver: Receiver<Box<dyn FnOnce() + Send>>, threads: Arc<Mutex<Vec<JoinHandle<()>>>>, max_threads: usize, } impl DefaultExecutor { pub fn new(max_threads: usize) -> Arc<Self> { let (sender, receiver) = crossbeam_channel::unbounded(); Arc::new(Self { sender, receiver, threads: Default::default(), max_threads, }) } pub(crate) fn default() -> Arc<Self> { Self::new(1) } } impl DefaultExecutor { pub(crate) fn maybe_spawn_thread(&self) -> Result<()> { let mut threads = self.threads.lock(); let id = threads.len() + 1; if id <= self.max_threads { let receiver = self.receiver.clone(); threads.push( ThreadBuilder::new() .name(format!("executor {}", id)) .spawn(move || { for f in receiver { f(); } }) .map_err(Error::IOError)?, ); } Ok(()) } } impl Executor for DefaultExecutor { fn execute(&self, f: Box<dyn FnOnce() + Send>) -> Result<()> { self.maybe_spawn_thread()?; self.sender.send(f).expect("executor failed"); Ok(()) } }