parallel_processor/
execution_manager.rs1pub mod executor;
2pub mod notifier;
3pub mod objects_pool;
4pub mod packet;
5pub mod packets_channel;
6pub mod scheduler;
7pub mod thread_pool;
8
9#[cfg(test)]
10mod tests {
11
12 use crate::execution_manager::executor::{AsyncExecutor, ExecutorReceiver};
13 use crate::execution_manager::objects_pool::PoolObjectTrait;
14 use crate::execution_manager::packet::{PacketTrait, PacketsPool};
15 use crate::execution_manager::scheduler::{run_blocking_op, Scheduler};
16 use crate::execution_manager::thread_pool::{ExecThreadPool, ExecutorsHandle};
17 use crate::set_logger_function;
18 use std::num::Wrapping;
19 use std::sync::atomic::AtomicUsize;
20 use std::sync::Arc;
21
22 struct TestExecutor {}
23
24 impl PoolObjectTrait for usize {
25 type InitData = ();
26
27 fn allocate_new(_init_data: &Self::InitData) -> Self {
28 0
29 }
30
31 fn reset(&mut self) {}
32 }
33
34 impl PacketTrait for usize {
35 fn get_size(&self) -> usize {
36 0
37 }
38 }
39
40 impl AsyncExecutor for TestExecutor {
41 type InputPacket = usize;
42 type OutputPacket = usize;
43 type GlobalParams = AtomicUsize;
44 type InitData = ExecutorsHandle<Self>;
45 const ALLOW_PARALLEL_ADDRESS_EXECUTION: bool = false;
46
47 fn new() -> Self {
48 Self {}
49 }
50
51 fn executor_main<'a>(
52 &'a mut self,
53 _global_params: &'a Self::GlobalParams,
54 mut receiver: ExecutorReceiver<Self>,
55 ) {
57 static INDEX: AtomicUsize = AtomicUsize::new(0);
58 let index = INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
59 let pool = PacketsPool::<usize>::new(1024, ());
60 while let Ok(addr) = receiver.obtain_address() {
61 while let Some(packet) = addr.receive_packet() {
63 crate::log_info!("X: {}", *packet);
66
67 let pvalue = *packet;
68
69 addr.spawn_executors(3, |_| {
70 run_blocking_op(|| {});
71 let address = addr
74 .get_init_data()
75 .create_new_address(Arc::new(addr.get_init_data().clone()), false);
76
77 let mut x = Wrapping(pvalue);
78 for i in 1..100000 {
79 let i = Wrapping(i as usize);
80 x += i * i + x;
81 }
82 if x.0 == usize::MAX {
85 crate::log_info!("Overflow detected in executor {}", index);
86 }
87
88 let mut npacket = pool.alloc_packet();
89 *npacket = 0 + pvalue * 2;
90
91 if *npacket > 64 {
92 return;
93 }
94
95 address.send_packet(npacket);
96 });
97
98 drop(packet);
99 }
100 }
101 println!("Finished {}!", index);
102 }
103 }
104
105 #[test]
106 #[ignore]
107 fn test_executors() {
108 set_logger_function(|_level, message| {
109 println!("{}", message);
110 });
111
112 let scheduler = Scheduler::new(16);
113
114 let mut readers_pool = ExecThreadPool::<TestExecutor>::new(16, "readers-pool", true);
115
116 let running_count = Arc::new(AtomicUsize::new(0));
117
118 let handle = readers_pool.start(scheduler, &running_count);
119
120 let strings = vec![1]; handle.add_input_data(handle.clone(), strings.into_iter());
123
124 drop(handle);
127
128 readers_pool.join();
138 }
139}