parallel_processor/execution_manager/
mod.rs1pub mod async_channel;
2pub mod execution_context;
3pub mod executor;
4pub mod executor_address;
5pub mod memory_tracker;
6pub mod objects_pool;
7pub mod packet;
8pub mod thread_pool;
9pub mod units_io;
10
11#[cfg(test)]
12mod tests {
13 use crate::execution_manager::execution_context::{ExecutionContext, PoolAllocMode};
14 use crate::execution_manager::executor::{AsyncExecutor, ExecutorReceiver};
15 use crate::execution_manager::memory_tracker::MemoryTracker;
16 use crate::execution_manager::objects_pool::PoolObjectTrait;
17 use crate::execution_manager::packet::PacketTrait;
18 use crate::execution_manager::thread_pool::ExecThreadPool;
19 use crate::execution_manager::units_io::{ExecutorInput, ExecutorInputAddressMode};
20 use std::future::Future;
21 use std::ops::Deref;
22 use std::sync::Arc;
23 use std::time::Duration;
24
25 struct TestExecutor {}
26
27 impl PoolObjectTrait for usize {
28 type InitData = ();
29
30 fn allocate_new(_init_data: &Self::InitData) -> Self {
31 0
32 }
33
34 fn reset(&mut self) {}
35 }
36
37 impl PacketTrait for usize {
38 fn get_size(&self) -> usize {
39 0
40 }
41 }
42
43 impl AsyncExecutor for TestExecutor {
44 type InputPacket = usize;
45 type OutputPacket = usize;
46 type GlobalParams = ();
47 type InitData = ();
48 type AsyncExecutorFuture<'a> = impl Future<Output = ()> + 'a;
49
50 fn new() -> Self {
51 Self {}
52 }
53
54 fn async_executor_main<'a>(
55 &'a mut self,
56 _global_params: &'a Self::GlobalParams,
57 mut receiver: ExecutorReceiver<Self>,
58 _memory_tracker: MemoryTracker<Self>,
59 ) -> impl Future<Output = ()> + 'a {
60 async move {
61 while let Ok((addr, _init_data)) = receiver.obtain_address().await {
62 let pool = addr.pool_alloc_await(1000).await;
63
64 while let Some(packet) = addr.receive_packet().await {
65 let mut x = *packet.deref();
66 for i in 0..100000000 {
67 x += i * x + i;
68 }
69 crate::log_info!("X: {}", x);
70
71 tokio::time::sleep(Duration::from_millis(1000)).await;
72
73 drop(packet);
74 for exec in 0..2 {
75 let address = TestExecutor::generate_new_address(());
76 addr.declare_addresses(vec![address.clone()], 0);
77
78 let mut packet = pool.alloc_packet().await;
79 *packet = exec + x;
80 crate::log_info!("Push packet {}", *packet.deref() * 2 + exec);
81 addr.packet_send(address.clone(), packet);
82 }
83 }
84 }
85 crate::log_info!("Ended executor!");
86 }
87 }
88 }
89
90 #[test]
91 #[ignore]
92 fn test_executors() {
93 let context = ExecutionContext::new();
94
95 let readers_pool = ExecThreadPool::new(&context, 16, "readers-pool");
96
97 readers_pool.register_executors::<TestExecutor>(
98 640000,
99 PoolAllocMode::Shared { capacity: 1024 },
100 (),
101 &Arc::new(()),
102 );
103
104 let strings = vec![1]; let mut test_input =
107 ExecutorInput::from_iter(strings.into_iter(), ExecutorInputAddressMode::Multiple);
108
109 test_input.set_output_executor::<TestExecutor>(&context, (), 0);
110
111 loop {
113 std::thread::sleep(Duration::from_millis(1000));
114 }
115 }
116}