1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
use crate::{internal_rpc::InternalRPCHandle, Promise, Result}; use async_task::Task; use crossbeam_channel::{Receiver, Sender}; use parking_lot::Mutex; use std::{ fmt, future::Future, ops::Deref, pin::Pin, sync::Arc, thread::{Builder as ThreadBuilder, JoinHandle}, }; pub trait Executor: std::fmt::Debug + Send + Sync { fn spawn(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<()>; } pub(crate) trait ExecutorExt: Executor { fn spawn_internal(&self, promise: Promise<()>, internal_rpc: InternalRPCHandle) -> Result<()> { if let Some(res) = promise.try_wait() { if let Err(err) = res.clone() { internal_rpc.set_connection_error(err); } res } else { self.spawn(Box::pin(async move { if let Err(err) = promise.await { internal_rpc.set_connection_error(err); } })) } } } impl Executor for Arc<dyn Executor> { fn spawn(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<()> { self.deref().spawn(f) } } impl ExecutorExt for Arc<dyn Executor> {} #[derive(Clone)] pub struct DefaultExecutor { sender: Sender<Task<()>>, receiver: Receiver<Task<()>>, threads: Arc<Mutex<Vec<JoinHandle<()>>>>, max_threads: usize, } impl DefaultExecutor { pub fn new(max_threads: usize) -> Self { let (sender, receiver) = crossbeam_channel::unbounded(); let threads = Default::default(); Self { sender, receiver, threads, max_threads, } } pub(crate) fn maybe_spawn_thread(&self) -> Result<()> { let mut threads = self.threads.lock(); let id = threads.len() + 1; if id <= self.max_threads { let receiver = self.receiver.clone(); threads.push( ThreadBuilder::new() .name(format!("executor {}", id)) .spawn(move || { for task in receiver { task.run(); } })?, ); } Ok(()) } } impl Default for DefaultExecutor { fn default() -> Self { Self::new(1) } } impl fmt::Debug for DefaultExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DefaultExecutor") .field("max_threads", &self.max_threads) .finish() } } impl Executor for DefaultExecutor { fn spawn(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<()> { self.maybe_spawn_thread()?; let sender = self.sender.clone(); let schedule = move |task| sender.send(task).expect("executor failed"); let (task, _) = async_task::spawn(f, schedule, ()); task.schedule(); Ok(()) } }