java_threadpool/ThreadPool_rs/
mod.rs

1use std::cell::RefCell;
2use std::sync::mpsc::{Receiver, Sender, SyncSender};
3use std::sync::{mpsc, Arc, LazyLock, Mutex};
4use std::thread;
5
6mod Executor_rs;
7mod Message_rs;
8pub mod Future_rs;
9
10use Executor_rs::Executor;
11use Message_rs::Message;
12use Future_rs::Future;
13
14
15static mut CORS_THREAD: Mutex<usize> = Mutex::new(0);
16static mut NON_CORE_THREAD: Mutex<usize> = Mutex::new(0);
17pub struct ThreadPool {
18    sync_sender: SyncSender<Message>,
19    core_pool_size: usize,
20    maximum_pool_size: usize,
21    maximum_queue: usize,
22}
23
24type Job = Box<dyn Fn() + Send + 'static>;
25
26impl ThreadPool {
27    pub fn new(core_pool_size: usize, maximum_pool_size: usize, maximum_queue: usize) -> Self {
28
29        assert!(maximum_pool_size > core_pool_size);
30        assert!(maximum_queue > (core_pool_size + maximum_pool_size));
31
32
33        let (sync_sender, receiver) = mpsc::sync_channel::<Message>(maximum_queue);
34
35        let arc_mutex_receiver = Arc::new(Mutex::new(receiver));
36
37
38        for _ in 0..core_pool_size {
39            let clone_receiver = Arc::clone(&arc_mutex_receiver);
40
41            Executor::new(clone_receiver);
42        }
43
44
45        thread::spawn(move || {
46            loop {
47                unsafe {
48                    let mut thread_size = CORS_THREAD.lock().unwrap();
49                    let mut non_thread_size = NON_CORE_THREAD.lock().unwrap();
50
51                    if (*thread_size) > core_pool_size && (*non_thread_size) < (maximum_pool_size - core_pool_size) {
52                        drop(thread_size);
53
54                        let non_receiver = Arc::clone(&arc_mutex_receiver);
55
56                        // Executor::new_thread(Arc::clone(&arc_mutex_receiver));
57                        let mute = non_receiver.try_lock();
58                        match mute {
59                            Ok(rece) => {
60                                match rece.try_recv() {
61                                    Ok(message) => {
62                                        drop(rece);
63
64                                        match message {
65                                            Message::Mess_job((closure, sender)) => {
66                                                println!("非核心线程.........");
67                                                thread::spawn(move || {
68                                                    closure();
69                                                    sender.send(String::from("end")).unwrap();
70                                                    unsafe {
71                                                        let mut thread_size = CORS_THREAD.lock().unwrap();
72                                                        (*thread_size) = ((*thread_size) - 1);
73
74                                                        let mut non_thread_size = NON_CORE_THREAD.lock().unwrap();
75                                                        (*non_thread_size) = (*non_thread_size) - 1;
76                                                    }
77                                                });
78                                            }
79
80                                            Message::Break => { break }
81                                        }
82                                    }
83
84                                    Err(e) => ()
85                                }
86                            }
87                            Err(e) => ()
88                        }
89                        (*non_thread_size) += 1;
90                    }
91                }
92            }
93        });
94
95        return ThreadPool {
96            sync_sender: sync_sender,
97            core_pool_size: core_pool_size,
98            maximum_pool_size: maximum_pool_size,
99            maximum_queue: maximum_queue,
100        };
101    }
102
103
104    pub fn executor<F>(&self, closure: F) -> Future
105    where
106        F: Fn() + Send + 'static,
107    {
108        unsafe {
109            let mut thread_size = CORS_THREAD.lock().unwrap();
110
111
112            let (sender, receiver) = mpsc::channel::<String>();
113
114            self.sync_sender.send(Message::Mess_job((Box::new(closure), sender))).unwrap();
115            let future = Future { receiver: receiver };
116
117
118            *thread_size = (*thread_size) + 1;
119            return future;
120        }
121    }
122
123
124    pub fn shutdown(&self) {
125        for _ in 0..(self.core_pool_size + 1) {
126            self.sync_sender.send(Message::Break).unwrap();
127        }
128    }
129}
130
131impl Drop for ThreadPool {
132    fn drop(&mut self) {
133        for _ in 0..(self.core_pool_size + 1) {
134            self.sync_sender.send(Message::Break).unwrap();
135        }
136    }
137}