fast_able/fast_thread_pool/
lite.rs

1use std::{fs, io::Write, sync::Arc};
2
3use crossbeam::atomic::AtomicCell;
4
5use crate::fast_thread_pool::FILE_CORE_AFFINITY;
6
7use super::{get_core_skip, TaskExecutor};
8
9/// 简易线程池
10/// 用于快速提交任务
11/// 只有一个线程, 只绑定一个核心
12pub struct ThreadPoolLite {
13    pub thread: TaskExecutor,
14}
15
16/// 使用最后一个闲置核心
17pub fn use_last_core(use_name: &str) -> usize {
18    let core = core_affinity::get_core_ids().unwrap_or_else(|| {
19        warn!("获取cpu核心数失败");
20        vec![]
21    });
22
23    // 获得之前已经绑定的核心
24    _ = fs::File::create_new(FILE_CORE_AFFINITY);
25    let old_cpu_num = fs::read_to_string(FILE_CORE_AFFINITY)
26        .expect("open core_affinity file read_to_string error");
27    let old_cpu_num = old_cpu_num
28        .replace("\n", ",")
29        .split(',')
30        .filter_map(|x| x.parse::<usize>().ok())
31        .collect::<Vec<_>>();
32
33    let old_last = old_cpu_num.last().map(|x| *x).unwrap_or_else(|| 0);
34    let skip = get_core_skip();
35    let use_core = if old_last > 1 {
36        old_cpu_num
37            .last()
38            .map(|x| core_affinity::CoreId { id: *x - skip })
39            .unwrap_or_else(|| {
40                warn!("获取cpu核心数失败");
41                core_affinity::CoreId { id: 0 }
42            })
43    } else {
44        core.last().map(|x| x.clone()).unwrap_or_else(|| {
45            warn!("获取cpu核心数失败");
46            core_affinity::CoreId { id: 0 }
47        })
48    };
49
50    {
51        warn!("绑核 {use_core:?}");
52        let mut file = fs::OpenOptions::new()
53            .append(true)
54            .open(FILE_CORE_AFFINITY)
55            .unwrap();
56        // let _ = writeln!(file, "aaa");
57        if !old_cpu_num.is_empty() {
58            let _ = file.write_all("\n".as_bytes());
59        }
60        let _ = file.write_all(format!("{use_name},{}", use_core.id).as_bytes());
61        file.flush().expect("ThreadPoolLite flush error");
62    }
63
64    use_core.id
65}
66
67impl ThreadPoolLite {
68    pub fn new() -> ThreadPoolLite {
69        let use_core = use_last_core("thread_lite");
70        let r = ThreadPoolLite {
71            thread: TaskExecutor::new(core_affinity::CoreId { id: use_core }, 49),
72        };
73        r
74    }
75
76    pub fn spawn<F>(&self, f: F)
77    where
78        F: FnOnce(),
79        F: Send + 'static,
80    {
81        self.thread.spawn(|_| f());
82    }
83}
84
85pub fn _test_thread_lite(test_count: u128) {
86    let pool = ThreadPoolLite::new();
87    std::thread::sleep(std::time::Duration::from_millis(200));
88    let com_time = Arc::new(AtomicCell::new(0_u128));
89    for _ in 0..test_count {
90        let now = std::time::Instant::now();
91        let com_time = com_time.clone();
92        pool.spawn(move || {
93            // println!("run _test_thread_lite i: {}", i);
94            let el = now.elapsed().as_nanos();
95            com_time.fetch_add(el);
96        });
97    }
98    println!("------------------------thread_lite 任务提交完成------------------------");
99    std::thread::sleep(std::time::Duration::from_secs(1));
100    println!(
101        "thread_lite 测试结果: 线程开启平均耗时: {:.3} micros",
102        com_time.load() as f64 / test_count as f64 / 1000.0
103    );
104}