1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
use crate::{thread::ThreadHandle, Result}; use async_task::Runnable; use crossbeam_channel::{Receiver, Sender}; use parking_lot::Mutex; use std::{ cell::RefCell, fmt, future::Future, ops::Deref, pin::Pin, sync::Arc, thread::Builder as ThreadBuilder, }; thread_local!(static LAPIN_EXECUTOR_THREAD: RefCell<bool> = RefCell::new(false)); pub(crate) fn within_executor() -> bool { LAPIN_EXECUTOR_THREAD.with(|executor_thread| *executor_thread.borrow()) } pub trait Executor: std::fmt::Debug + Send + Sync { fn spawn(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<()>; } impl Executor for Arc<dyn Executor> { fn spawn(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<()> { self.deref().spawn(f) } } #[derive(Clone)] pub struct DefaultExecutor { sender: Sender<Option<Runnable>>, receiver: Receiver<Option<Runnable>>, threads: Arc<Mutex<Vec<ThreadHandle>>>, max_threads: usize, } impl DefaultExecutor { pub fn new(max_threads: usize) -> Self { let (sender, receiver) = crossbeam_channel::unbounded(); let threads = Default::default(); Self { sender, receiver, threads, max_threads, } } pub(crate) fn maybe_spawn_thread(&self) -> Result<()> { let mut threads = self.threads.lock(); let id = threads.len() + 1; if id <= self.max_threads { let receiver = self.receiver.clone(); threads.push(ThreadHandle::new( ThreadBuilder::new() .name(format!("executor {}", id)) .spawn(move || { LAPIN_EXECUTOR_THREAD .with(|executor_thread| *executor_thread.borrow_mut() = true); while let Ok(Some(runnable)) = receiver.recv() { runnable.run(); } Ok(()) })?, )); } Ok(()) } } impl Default for DefaultExecutor { fn default() -> Self { Self::new(1) } } impl fmt::Debug for DefaultExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DefaultExecutor") .field("max_threads", &self.max_threads) .finish() } } impl Executor for DefaultExecutor { fn spawn(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<()> { self.maybe_spawn_thread()?; let sender = self.sender.clone(); let schedule = move |runnable| sender.send(Some(runnable)).expect("executor failed"); let (runnable, task) = async_task::spawn(f, schedule); runnable.schedule(); task.detach(); Ok(()) } } impl Drop for DefaultExecutor { fn drop(&mut self) { if let Some(threads) = self.threads.try_lock() { for _ in threads.iter() { let _ = self.sender.send(None); } for thread in threads.iter() { if !thread.is_current() { let _ = thread.wait("executor"); } } } } }