fast_able/fast_thread_pool/
pool.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
use crate::stock_pool::StockPool;
use core_affinity::CoreId;
use crossbeam::{atomic::AtomicCell, queue};
use spin::Mutex;
use std::{fmt::Debug, fs, io::Write, sync::Arc, thread};

use super::TaskExecutor;

pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";

pub fn init() {
    _ = fs::remove_file(FILE_CORE_AFFINITY);
    warn!(
        "thread_mod init; remove_file core_affinity: {:?}",
        FILE_CORE_AFFINITY
    );
}

#[cfg(feature = "deal_physical_cpu")]
pub fn get_core_skip() -> usize {
    let core_ids = num_cpus::get();
    let core_physical = num_cpus::get_physical();
    if core_ids / core_physical == 2 {
        warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
        2
    } else {
        1
    }
}

#[cfg(not(feature = "deal_physical_cpu"))]
pub fn get_core_skip() -> usize {
    1
}

/// 线程池
/// 通过线程池来管理线程
/// 此线程池的线程个数默认为cpu核心数的4分之1; 任务默认提交在默认线程池;
/// 通过一个api开启独享高性能模式; 独享高性能模式下,线程池的线程个数最多为cpu核心数的5分之1, 比如128核的cpu, 线程池的线程个数最多为25个;
/// theads_高性能模式: 通过查看当前线程的任务数, 如果任务数10毫秒内任务数是其它线程中最少的, 则将任务分配给该线程;
/// 使用 core_affinity 获得cpu核心数
/// 如果已经有一个股票的任务在一个线程中执行, 则将任务分配给该线程; 如果该股票的任务全部执行完毕, 则将任务分配给任务数最少的线程;
pub struct ThreadPool {
    pub threads_share: Vec<TaskExecutor>,
    pub threads_fast: crate::vec::SyncVec<TaskExecutor>,

    pub switch_thread_index: Mutex<i32>,

    // pub 高性能模式_任务数最少的线程: AtomicCell<usize>,
    /// 记录高性能模式下, 当前股票代码在哪个线程中执行
    pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,

    /// 记录高性能模式下, 当前股票代码正在执行的任务数
    // 高性能模式_记录_任务数: StockPool<Arc<()>>,

    /// 防止同一支票并行运行
    pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
    // 多少个核心
    // core_num: Vec<core_affinity::CoreId>,
    // current_core: AtomicCell<i32>,
}

// pub struct Pto<T: Debug + Default>(AtomicCell<T>);
// impl<T: Debug + Default> Deref for Pto<T> {
//     type Target = AtomicCell<T>;
//     fn deref(&self) -> &Self::Target {
//         &self.0
//     }
// }
// impl<T: Debug + Default + Copy> Debug for Pto<T> {
//     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
//         f.write_str(&format!("{:?}", self.load()))
//     }
// }
// impl<T: Debug + Default> Default for Pto<T> {
//     fn default() -> Self {
//         Pto(AtomicCell::new(T::default()))
//     }
// }

impl ThreadPool {
    pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
        let cpu_num = core_affinity::get_core_ids()
            .unwrap_or_else(|| vec![])
            .len();

        let skip = get_core_skip();
        let cpu_num = cpu_num / skip;
        let num = cpu_num / cpu_fraction_share;

        let core_num = core_affinity::get_core_ids().unwrap_or_else(|| {
            warn!("获取cpu核心数失败");
            vec![]
        });
        let mut current_core: i32 = core_num.len() as i32;
        let max_core = current_core;

        // 读取之前的绑核信息
        _ = fs::File::create_new(FILE_CORE_AFFINITY);
        let old_cpu_num = fs::read_to_string(FILE_CORE_AFFINITY)
            .expect("open core_affinity file read_to_string error");
        println!("old_cpu_num: {}", old_cpu_num);
        if !old_cpu_num.is_empty() {
            let old_cpu_num = old_cpu_num.replace("\n", ",");
            let old_cpu_num = old_cpu_num.split(',').collect::<Vec<_>>();
            if let Some(old_cpu_num) = old_cpu_num.last() {
                if let Ok(old_cpu_num) = old_cpu_num.parse::<i32>() {
                    current_core = old_cpu_num;
                }
            }
        }
        let skip = get_core_skip() as i32;
        let mut bind_cores = vec![];

        debug!("threads_share cpu count: {}", num);
        let mut threads_共享 = Vec::with_capacity(num);
        for _ in 0..num {
            current_core -= skip;
            if current_core < 0 {
                current_core = max_core - 1;
            }
            let core = core_num
                .get(current_core as usize)
                .map(|x| x.clone())
                .unwrap_or_else(|| {
                    warn!("获取cpu核心数失败");
                    core_affinity::CoreId { id: 0 }
                });
            bind_cores.push(core.id.to_string());
            threads_共享.push(TaskExecutor::new(core));
        }

        if cpu_fraction_fast == 0 {
            cpu_fraction_fast = cpu_num;
        }
        let mut num = cpu_num / cpu_fraction_fast;
        if num == 0 {
            num = 1;
        }
        debug!("theads_fraction_fast cpu count: {}", num);
        let theads_高性能模式 = crate::vec::SyncVec::with_capacity(num);

        for _ in 0..num {
            current_core -= skip;
            if current_core < 0 {
                current_core = max_core;
            }

            let core = core_num
                .get(current_core as usize)
                .map(|x| x.clone())
                .unwrap_or_else(|| {
                    warn!("获取cpu核心数失败");
                    core_affinity::CoreId { id: 0 }
                });
            bind_cores.push(core.id.to_string());

            theads_高性能模式.push(TaskExecutor::new(core));
        }

        debug!("fast_thread_pool_bind_cores: {:?}", bind_cores);
        // fs::write(FILE_CORE_AFFINITY, bind_cores.join(","))
        //     .expect("write core_affinity file write_all error");

        use std::fs::OpenOptions;
        use std::io::Write;

        {
            let mut file = OpenOptions::new()
                .append(true)
                .open(FILE_CORE_AFFINITY)
                .unwrap();
            // let _ = writeln!(file, "aaa");
            if !old_cpu_num.is_empty() {
                let _ = file.write_all("\n".as_bytes());
            }
            let _ = file.write_all(bind_cores.join(",").as_bytes());
            file.flush().expect("ThreadPoolLite flush error");
        }

        // std::env::set_var("fast_thread_pool_bind_cores", bind_cores.join(","));

        let r = ThreadPool {
            threads_share: threads_共享,
            threads_fast: theads_高性能模式,
            // 高性能模式_任务数最少的线程: 0.into(),
            threads_fast_idx: StockPool::new(),
            // 高性能模式_记录_任务数: StockPool::new(),
            stock_lock: StockPool::new(),
            switch_thread_index: (-1).into(),
            // core_num,
            // current_core: -1.into(),
        };

        // let r1 = r.clone();
        // std::thread::spawn(move || loop {
        //     r1.loop_任务数最少的线程();
        //     std::thread::sleep(std::time::Duration::from_millis(10));
        // });
        r
    }

    /// 获取拥有最少任务的线程索引
    ///
    /// 本函数遍历快速线程池中的线程,寻找任务计数最少的线程。
    /// 如果找到一个任务计数为0的线程,它将立即返回该线程的索引,
    /// 因为这表示该线程目前没有任务。否则,函数将返回任务计数最少的线程索引。
    /// 这个信息用于调度新任务到拥有最少任务的线程,以平衡线程间的工作负载。
    /*
    pub fn count_task_min(&self) -> usize {
        // 初始化最小任务计数为第一个线程的任务计数,最小索引为0
        let mut min_count = self.theads_fast[0].count.load();
        let mut min_index = 0;

        // 遍历快速线程池中的线程
        for (i, thread) in self.theads_fast.iter().enumerate() {
            let count = thread.count.load();

            // 如果找到一个任务计数为0的线程,立即返回其索引
            if count == 0 {
                min_index = i;
                break;
            }

            // 如果当前线程的任务计数少于最小任务计数,更新最小任务计数和索引
            if count < min_count {
                min_count = count;
                min_index = i;
            }
        }

        // 返回任务计数最少的线程索引
        min_index
    }
    */

    /// 获取下一个要切换到的线程的索引,以实现线程之间的任务平衡。
    ///
    /// 这个方法通过循环的方式选择下一个线程索引,以确保任务能够均匀地分配给每个线程。
    /// 它使用了一个互斥锁来保证在多线程环境下对索引的访问是安全的。
    ///
    /// # 返回值
    /// 根据当前线程池的状态,计算并返回下一个应该执行任务的线程索引。
    /// 这个方法旨在平衡线程间的任务分配,避免某个线程过载而其他线程闲置的情况。
    ///
    /// 参数 `i7` 作为一个辅助的计算参数,用于在无法立即获得锁时决定返回哪个线程索引。
    ///
    /// 返回值是一个枚举 `IdxStatus`,它可以指示线程应该记住当前计算出的索引(`Remember`),
    /// 或者由于锁的竞争失败而丢弃当前的计算并使用另一个索引(`Discard`)。
    /// 返回当前选择的线程索引。
    pub fn count_task_min(&self, i7: i32) -> IdxStatus {
        // 获取线程池中线程的数量,用于后续计算下一个任务线程的索引。
        // 获取线程池中线程的数量
        let len = self.threads_fast.len();

        // 尝试获取用于控制任务分配的索引互斥锁,如果无法立即获得锁,则根据 `i7` 返回一个备选索引。
        // 获取用于控制线程切换的索引的互斥锁
        let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
            Some(mutex) => mutex,
            None => return IdxStatus::Discard(i7 as usize % len),
        };

        // 如果当前索引为0,将其设置为最大值,否则递减索引,以实现循环分配策略。
        // 这样做是为了实现循环访问,避免索引越界
        if *min_index == 0 || *min_index == -1 {
            *min_index = len as i32 - 1;
        } else {
            *min_index -= 1;
        }

        // 返回之前复制的索引值,指示线程应该记住这个索引以供下次使用。
        // 返回复制的索引值
        let r = *min_index as usize;
        self.threads_fast_idx[i7].store(Some(r));
        IdxStatus::Remember(r)
    }

    #[inline(always)]
    pub fn spawn<F>(&self, i7: i32, f: F)
    where
        F: FnOnce(),
        F: Send + 'static,
    {
        let _lock = self.stock_lock[i7].clone();
        let index = i7 % self.threads_share.len() as i32;
        self.threads_share[index as usize].spawn(move |_core| {
            let _lock = _lock.lock();
            // #[cfg(debug_assertions)]
            // print!("高性能模式, 共享线程({i7}): {index} ");
            f();
            drop(_lock);
        });
    }

    #[inline(always)]
    pub fn spawn_fast<F>(&self, i7: i32, f: F)
    where
        F: FnOnce(),
        F: Send + 'static,
    {
        let mut on_fast_idx = -1;

        // 找最少任务数的线程
        #[cfg(not(feature = "thread_dispatch"))]
        let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
            let min = self.count_task_min(i7);
            let idx = min.get_idx();
            if let IdxStatus::Remember(idx) = &min {
                on_fast_idx = *idx as i32;
            }
            idx
        });

        // 如果当前任务堆积小于5个, 则使用当前线程; 否则就去找最少任务数的线程
        // 有任务调度的线程方法, 如果有任务堆积则通过找任务数最少的线程来提交任务
        #[cfg(feature = "thread_dispatch")]
        let thread_idx = match self.threads_fast_idx[i7].load() {
            Some(i) if self.threads_fast[i].count.load() < 1000 => i,
            _ => {
                let min = self.count_task_min(i7);
                let idx = min.get_idx();
                if let IdxStatus::Remember(idx) = &min {
                    on_fast_idx = *idx as i32;
                }
                idx
            }
        };

        // 提交任务
        let lock = self.stock_lock[i7].clone();
        self.threads_fast[thread_idx].spawn(move |core| {
            let lock_v = lock.lock();
            // print!(" {i7} theads_fast: {thread_idx} ");
            f();
            drop(lock_v);
            if on_fast_idx != -1 {
                debug!("on_fast thread; i7: {i7}, cpu: {core}");
            }
        });
    }

    #[inline(always)]
    pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
    where
        F: FnOnce(),
        F: Send + 'static,
    {
        if is_fast {
            self.spawn_fast(i7, f);
        } else {
            self.spawn(i7, f);
        }
    }
}

pub enum IdxStatus {
    // 记住索引
    Remember(usize),
    // 丢弃索引
    Discard(usize),
}

impl IdxStatus {
    pub fn get_idx(&self) -> usize {
        match self {
            IdxStatus::Remember(idx) => *idx,
            IdxStatus::Discard(idx) => *idx,
        }
    }
}

#[test]
fn _test_pool() {
    std::env::set_var("RUST_LOG", "debug");
    env_logger::init();

    init();
    let pool = ThreadPool::new(2, 4);
    std::thread::sleep(std::time::Duration::from_millis(200));

    let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
    let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
    let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));

    let count_c = count.clone();
    let elapsed_total_c = elapsed_total.clone();
    let elapsed_exp_c = elapsed_exp.clone();
    std::thread::spawn(move || loop {
        std::thread::sleep(std::time::Duration::from_secs(3));
        let count = count_c.fetch_and(0);
        let elapsed_total = elapsed_total_c.fetch_and(0);
        let elapsed_exp = elapsed_exp_c.fetch_and(0);
        info!(
            "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
            count,
            elapsed_total,
            elapsed_total / count,
            elapsed_exp,
            elapsed_exp as f64 / count as f64 * 10000.0,
        );
    });

    loop {
        for i in 0..100 {
            // std::thread::sleep(std::time::Duration::from_micros(i % 50));
            let time_hs = std::time::Instant::now();
            let count = count.clone();
            let elapsed_total = elapsed_total.clone();
            let elapsed_exp = elapsed_exp.clone();
            // spin::Barrier::new(i % 10).wait();
            // spin::relax::Loop::(Duration::from_micros(i % 50));
            let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
            pool.spawn_is_fast(i7, true, move || {
                let micros = time_hs.elapsed().as_micros();
                count.fetch_add(1);
                elapsed_total.fetch_add(micros as i64);
                if micros > 100 {
                    elapsed_exp.fetch_add(1);
                }
            });
        }
        std::thread::sleep(std::time::Duration::from_micros(110));
    }
    std::thread::sleep(std::time::Duration::from_secs(9999));
}