parallel_processor/execution_manager/
mod.rs

1pub mod async_channel;
2pub mod execution_context;
3pub mod executor;
4pub mod executor_address;
5pub mod memory_tracker;
6pub mod objects_pool;
7pub mod packet;
8pub mod thread_pool;
9pub mod units_io;
10
11#[cfg(test)]
12mod tests {
13    use crate::execution_manager::execution_context::{ExecutionContext, PoolAllocMode};
14    use crate::execution_manager::executor::{AsyncExecutor, ExecutorReceiver};
15    use crate::execution_manager::memory_tracker::MemoryTracker;
16    use crate::execution_manager::objects_pool::PoolObjectTrait;
17    use crate::execution_manager::packet::PacketTrait;
18    use crate::execution_manager::thread_pool::ExecThreadPool;
19    use crate::execution_manager::units_io::{ExecutorInput, ExecutorInputAddressMode};
20    use std::future::Future;
21    use std::ops::Deref;
22    use std::sync::Arc;
23    use std::time::Duration;
24
25    struct TestExecutor {}
26
27    impl PoolObjectTrait for usize {
28        type InitData = ();
29
30        fn allocate_new(_init_data: &Self::InitData) -> Self {
31            0
32        }
33
34        fn reset(&mut self) {}
35    }
36
37    impl PacketTrait for usize {
38        fn get_size(&self) -> usize {
39            0
40        }
41    }
42
43    impl AsyncExecutor for TestExecutor {
44        type InputPacket = usize;
45        type OutputPacket = usize;
46        type GlobalParams = ();
47        type InitData = ();
48        type AsyncExecutorFuture<'a> = impl Future<Output = ()> + 'a;
49
50        fn new() -> Self {
51            Self {}
52        }
53
54        fn async_executor_main<'a>(
55            &'a mut self,
56            _global_params: &'a Self::GlobalParams,
57            mut receiver: ExecutorReceiver<Self>,
58            _memory_tracker: MemoryTracker<Self>,
59        ) -> impl Future<Output = ()> + 'a {
60            async move {
61                while let Ok((addr, _init_data)) = receiver.obtain_address().await {
62                    let pool = addr.pool_alloc_await(1000).await;
63
64                    while let Some(packet) = addr.receive_packet().await {
65                        let mut x = *packet.deref();
66                        for i in 0..100000000 {
67                            x += i * x + i;
68                        }
69                        crate::log_info!("X: {}", x);
70
71                        tokio::time::sleep(Duration::from_millis(1000)).await;
72
73                        drop(packet);
74                        for exec in 0..2 {
75                            let address = TestExecutor::generate_new_address(());
76                            addr.declare_addresses(vec![address.clone()], 0);
77
78                            let mut packet = pool.alloc_packet().await;
79                            *packet = exec + x;
80                            crate::log_info!("Push packet {}", *packet.deref() * 2 + exec);
81                            addr.packet_send(address.clone(), packet);
82                        }
83                    }
84                }
85                crate::log_info!("Ended executor!");
86            }
87        }
88    }
89
90    #[test]
91    #[ignore]
92    fn test_executors() {
93        let context = ExecutionContext::new();
94
95        let readers_pool = ExecThreadPool::new(&context, 16, "readers-pool");
96
97        readers_pool.register_executors::<TestExecutor>(
98            640000,
99            PoolAllocMode::Shared { capacity: 1024 },
100            (),
101            &Arc::new(()),
102        );
103
104        let strings = vec![1]; //, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
105
106        let mut test_input =
107            ExecutorInput::from_iter(strings.into_iter(), ExecutorInputAddressMode::Multiple);
108
109        test_input.set_output_executor::<TestExecutor>(&context, (), 0);
110
111        // readers_pool
112        loop {
113            std::thread::sleep(Duration::from_millis(1000));
114        }
115    }
116}