parallel_processor/execution_manager/
thread_pool.rs

1use crate::execution_manager::execution_context::{ExecutionContext, PoolAllocMode};
2use crate::execution_manager::executor::{AsyncExecutor, ExecutorReceiver};
3use crate::execution_manager::objects_pool::PoolObjectTrait;
4use parking_lot::Mutex;
5use std::any::TypeId;
6use std::marker::PhantomData;
7use std::sync::Arc;
8use tokio::runtime::{Builder, Runtime};
9
10pub struct ExecThreadPool {
11    context: Arc<ExecutionContext>,
12    executors: Mutex<Vec<tokio::task::JoinHandle<()>>>,
13    runtime: Runtime,
14}
15
16pub struct ExecutorsHandle<E: AsyncExecutor>(PhantomData<E>);
17impl<E: AsyncExecutor> Clone for ExecutorsHandle<E> {
18    fn clone(&self) -> Self {
19        *self
20    }
21}
22impl<E: AsyncExecutor> Copy for ExecutorsHandle<E> {}
23
24impl ExecThreadPool {
25    pub fn new(context: &Arc<ExecutionContext>, threads_count: usize, name: &str) -> Self {
26        Self {
27            context: context.clone(),
28            executors: Mutex::new(Vec::new()),
29            runtime: Builder::new_multi_thread()
30                .thread_name(name)
31                .worker_threads(threads_count)
32                .build()
33                .unwrap(),
34        }
35    }
36
37    pub fn register_executors<E: AsyncExecutor>(
38        &self,
39        count: usize,
40        pool_alloc_mode: PoolAllocMode,
41        pool_init_data: <E::OutputPacket as PoolObjectTrait>::InitData,
42        global_params: &Arc<E::GlobalParams>,
43    ) -> ExecutorsHandle<E> {
44        self.context
45            .register_executor_type::<E>(count, pool_alloc_mode, pool_init_data);
46
47        let addresses_channel = self
48            .context
49            .waiting_addresses
50            .lock()
51            .get(&TypeId::of::<E>())
52            .unwrap()
53            .clone();
54
55        let mut executors = self.executors.lock();
56
57        for _ in 0..count {
58            let context = self.context.clone();
59            let addresses_channel = addresses_channel.clone();
60            let global_params = global_params.clone();
61
62            executors.push(self.runtime.spawn(async move {
63                async {
64                    let context_ = context.clone();
65                    let mut executor = E::new();
66                    let sem_lock = context_.start_semaphore.acquire().await;
67                    let memory_tracker = context.memory_tracker.get_executor_instance();
68                    executor
69                        .async_executor_main(
70                            &global_params,
71                            ExecutorReceiver {
72                                context,
73                                addresses_channel,
74                                _phantom: PhantomData,
75                            },
76                            memory_tracker,
77                        )
78                        .await;
79
80                    drop(sem_lock);
81                    context_.wait_condvar.notify_all();
82                }
83                .await;
84            }));
85        }
86        ExecutorsHandle(PhantomData)
87    }
88
89    // pub fn debug_print_memory(&self) {
90    //     self.work_scheduler.print_debug_memory()
91    // }
92    //
93    // pub fn debug_print_queue(&self) {
94    //     self.work_scheduler.print_debug_executors()
95    // }
96    //
97}