parallel_processor/execution_manager/
thread_pool.rs1use crate::execution_manager::execution_context::{ExecutionContext, PoolAllocMode};
2use crate::execution_manager::executor::{AsyncExecutor, ExecutorReceiver};
3use crate::execution_manager::objects_pool::PoolObjectTrait;
4use parking_lot::Mutex;
5use std::any::TypeId;
6use std::marker::PhantomData;
7use std::sync::Arc;
8use tokio::runtime::{Builder, Runtime};
9
10pub struct ExecThreadPool {
11 context: Arc<ExecutionContext>,
12 executors: Mutex<Vec<tokio::task::JoinHandle<()>>>,
13 runtime: Runtime,
14}
15
16pub struct ExecutorsHandle<E: AsyncExecutor>(PhantomData<E>);
17impl<E: AsyncExecutor> Clone for ExecutorsHandle<E> {
18 fn clone(&self) -> Self {
19 *self
20 }
21}
22impl<E: AsyncExecutor> Copy for ExecutorsHandle<E> {}
23
24impl ExecThreadPool {
25 pub fn new(context: &Arc<ExecutionContext>, threads_count: usize, name: &str) -> Self {
26 Self {
27 context: context.clone(),
28 executors: Mutex::new(Vec::new()),
29 runtime: Builder::new_multi_thread()
30 .thread_name(name)
31 .worker_threads(threads_count)
32 .build()
33 .unwrap(),
34 }
35 }
36
37 pub fn register_executors<E: AsyncExecutor>(
38 &self,
39 count: usize,
40 pool_alloc_mode: PoolAllocMode,
41 pool_init_data: <E::OutputPacket as PoolObjectTrait>::InitData,
42 global_params: &Arc<E::GlobalParams>,
43 ) -> ExecutorsHandle<E> {
44 self.context
45 .register_executor_type::<E>(count, pool_alloc_mode, pool_init_data);
46
47 let addresses_channel = self
48 .context
49 .waiting_addresses
50 .lock()
51 .get(&TypeId::of::<E>())
52 .unwrap()
53 .clone();
54
55 let mut executors = self.executors.lock();
56
57 for _ in 0..count {
58 let context = self.context.clone();
59 let addresses_channel = addresses_channel.clone();
60 let global_params = global_params.clone();
61
62 executors.push(self.runtime.spawn(async move {
63 async {
64 let context_ = context.clone();
65 let mut executor = E::new();
66 let sem_lock = context_.start_semaphore.acquire().await;
67 let memory_tracker = context.memory_tracker.get_executor_instance();
68 executor
69 .async_executor_main(
70 &global_params,
71 ExecutorReceiver {
72 context,
73 addresses_channel,
74 _phantom: PhantomData,
75 },
76 memory_tracker,
77 )
78 .await;
79
80 drop(sem_lock);
81 context_.wait_condvar.notify_all();
82 }
83 .await;
84 }));
85 }
86 ExecutorsHandle(PhantomData)
87 }
88
89 }