parallel_processor/
execution_manager.rs

1pub mod executor;
2pub mod notifier;
3pub mod objects_pool;
4pub mod packet;
5pub mod packets_channel;
6pub mod scheduler;
7pub mod thread_pool;
8
9#[cfg(test)]
10mod tests {
11
12    use crate::execution_manager::executor::{AsyncExecutor, ExecutorReceiver};
13    use crate::execution_manager::objects_pool::PoolObjectTrait;
14    use crate::execution_manager::packet::{PacketTrait, PacketsPool};
15    use crate::execution_manager::scheduler::{run_blocking_op, Scheduler};
16    use crate::execution_manager::thread_pool::{ExecThreadPool, ExecutorsHandle};
17    use crate::set_logger_function;
18    use std::num::Wrapping;
19    use std::sync::atomic::AtomicUsize;
20    use std::sync::Arc;
21
22    struct TestExecutor {}
23
24    impl PoolObjectTrait for usize {
25        type InitData = ();
26
27        fn allocate_new(_init_data: &Self::InitData) -> Self {
28            0
29        }
30
31        fn reset(&mut self) {}
32    }
33
34    impl PacketTrait for usize {
35        fn get_size(&self) -> usize {
36            0
37        }
38    }
39
40    impl AsyncExecutor for TestExecutor {
41        type InputPacket = usize;
42        type OutputPacket = usize;
43        type GlobalParams = AtomicUsize;
44        type InitData = ExecutorsHandle<Self>;
45        const ALLOW_PARALLEL_ADDRESS_EXECUTION: bool = false;
46
47        fn new() -> Self {
48            Self {}
49        }
50
51        fn executor_main<'a>(
52            &'a mut self,
53            _global_params: &'a Self::GlobalParams,
54            mut receiver: ExecutorReceiver<Self>,
55            // _memory_tracker: MemoryTracker<Self>,
56        ) {
57            static INDEX: AtomicUsize = AtomicUsize::new(0);
58            let index = INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
59            let pool = PacketsPool::<usize>::new(1024, ());
60            while let Ok(addr) = receiver.obtain_address() {
61                // let pool = addr.get_pool();
62                while let Some(packet) = addr.receive_packet() {
63                    // std::thread::sleep(Duration::from_millis(10));
64                    // crate::log_info!("v: {}", x);
65                    crate::log_info!("X: {}", *packet);
66
67                    let pvalue = *packet;
68
69                    addr.spawn_executors(3, |_| {
70                        run_blocking_op(|| {});
71                        // println!("ENABLED count: {}", scheduler.current_running());
72
73                        let address = addr
74                            .get_init_data()
75                            .create_new_address(Arc::new(addr.get_init_data().clone()), false);
76
77                        let mut x = Wrapping(pvalue);
78                        for i in 1..100000 {
79                            let i = Wrapping(i as usize);
80                            x += i * i + x;
81                        }
82                        // println!("FINISHED count: {}", scheduler.current_running());
83
84                        if x.0 == usize::MAX {
85                            crate::log_info!("Overflow detected in executor {}", index);
86                        }
87
88                        let mut npacket = pool.alloc_packet();
89                        *npacket = 0 + pvalue * 2;
90
91                        if *npacket > 64 {
92                            return;
93                        }
94
95                        address.send_packet(npacket);
96                    });
97
98                    drop(packet);
99                }
100            }
101            println!("Finished {}!", index);
102        }
103    }
104
105    #[test]
106    #[ignore]
107    fn test_executors() {
108        set_logger_function(|_level, message| {
109            println!("{}", message);
110        });
111
112        let scheduler = Scheduler::new(16);
113
114        let mut readers_pool = ExecThreadPool::<TestExecutor>::new(16, "readers-pool", true);
115
116        let running_count = Arc::new(AtomicUsize::new(0));
117
118        let handle = readers_pool.start(scheduler, &running_count);
119
120        let strings = vec![1]; //, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
121
122        handle.add_input_data(handle.clone(), strings.into_iter());
123
124        // test_input.set_output_executor::<TestExecutor>(&context, (), 0);
125
126        drop(handle);
127
128        // loop {
129        //     // println!(
130        //     //     "Running count: {}",
131        //     //     running_count.load(std::sync::atomic::Ordering::Relaxed)
132        //     // );
133        //     std::thread::sleep(Duration::from_millis(1000));
134        // }
135
136        // readers_pool
137        readers_pool.join();
138    }
139}