java_threadpool/ThreadPool_rs/
mod.rs1use std::cell::RefCell;
2use std::sync::mpsc::{Receiver, Sender, SyncSender};
3use std::sync::{mpsc, Arc, LazyLock, Mutex};
4use std::thread;
5
6mod Executor_rs;
7mod Message_rs;
8pub mod Future_rs;
9
10use Executor_rs::Executor;
11use Message_rs::Message;
12use Future_rs::Future;
13
14
15static mut CORS_THREAD: Mutex<usize> = Mutex::new(0);
16static mut NON_CORE_THREAD: Mutex<usize> = Mutex::new(0);
17pub struct ThreadPool {
18 sync_sender: SyncSender<Message>,
19 core_pool_size: usize,
20 maximum_pool_size: usize,
21 maximum_queue: usize,
22}
23
24type Job = Box<dyn Fn() + Send + 'static>;
25
26impl ThreadPool {
27 pub fn new(core_pool_size: usize, maximum_pool_size: usize, maximum_queue: usize) -> Self {
28
29 assert!(maximum_pool_size > core_pool_size);
30 assert!(maximum_queue > (core_pool_size + maximum_pool_size));
31
32
33 let (sync_sender, receiver) = mpsc::sync_channel::<Message>(maximum_queue);
34
35 let arc_mutex_receiver = Arc::new(Mutex::new(receiver));
36
37
38 for _ in 0..core_pool_size {
39 let clone_receiver = Arc::clone(&arc_mutex_receiver);
40
41 Executor::new(clone_receiver);
42 }
43
44
45 thread::spawn(move || {
46 loop {
47 unsafe {
48 let mut thread_size = CORS_THREAD.lock().unwrap();
49 let mut non_thread_size = NON_CORE_THREAD.lock().unwrap();
50
51 if (*thread_size) > core_pool_size && (*non_thread_size) < (maximum_pool_size - core_pool_size) {
52 drop(thread_size);
53
54 let non_receiver = Arc::clone(&arc_mutex_receiver);
55
56 let mute = non_receiver.try_lock();
58 match mute {
59 Ok(rece) => {
60 match rece.try_recv() {
61 Ok(message) => {
62 drop(rece);
63
64 match message {
65 Message::Mess_job((closure, sender)) => {
66 println!("非核心线程.........");
67 thread::spawn(move || {
68 closure();
69 sender.send(String::from("end")).unwrap();
70 unsafe {
71 let mut thread_size = CORS_THREAD.lock().unwrap();
72 (*thread_size) = ((*thread_size) - 1);
73
74 let mut non_thread_size = NON_CORE_THREAD.lock().unwrap();
75 (*non_thread_size) = (*non_thread_size) - 1;
76 }
77 });
78 }
79
80 Message::Break => { break }
81 }
82 }
83
84 Err(e) => ()
85 }
86 }
87 Err(e) => ()
88 }
89 (*non_thread_size) += 1;
90 }
91 }
92 }
93 });
94
95 return ThreadPool {
96 sync_sender: sync_sender,
97 core_pool_size: core_pool_size,
98 maximum_pool_size: maximum_pool_size,
99 maximum_queue: maximum_queue,
100 };
101 }
102
103
104 pub fn executor<F>(&self, closure: F) -> Future
105 where
106 F: Fn() + Send + 'static,
107 {
108 unsafe {
109 let mut thread_size = CORS_THREAD.lock().unwrap();
110
111
112 let (sender, receiver) = mpsc::channel::<String>();
113
114 self.sync_sender.send(Message::Mess_job((Box::new(closure), sender))).unwrap();
115 let future = Future { receiver: receiver };
116
117
118 *thread_size = (*thread_size) + 1;
119 return future;
120 }
121 }
122
123
124 pub fn shutdown(&self) {
125 for _ in 0..(self.core_pool_size + 1) {
126 self.sync_sender.send(Message::Break).unwrap();
127 }
128 }
129}
130
131impl Drop for ThreadPool {
132 fn drop(&mut self) {
133 for _ in 0..(self.core_pool_size + 1) {
134 self.sync_sender.send(Message::Break).unwrap();
135 }
136 }
137}