fast-able 1.20.2

The world's martial arts are fast and unbreakable; 天下武功 唯快不破
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
use crate::fast_thread_pool::{get_core_skip, CORES};
use crate::stock_pool::StockPool;
use crossbeam::atomic::AtomicCell;
use spin::Mutex;
use std::{
    fs,
    sync::{Arc, LazyLock},
};

use super::TaskExecutor;

pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";

/// If it is a real-time system, write "realtime_system" to the file
/// 如果是实时系统, 则把"realtime_system"写入文件
pub fn init(is_realtime_system: bool) {
    _ = fs::remove_file(FILE_CORE_AFFINITY);
    warn!(
        "thread_mod init; remove_file core_affinity: {}, is_realtime_system: {is_realtime_system}",
        FILE_CORE_AFFINITY
    );

    if is_realtime_system {
        fs::write(FILE_CORE_AFFINITY, "realtime_system,\n").unwrap();
    }
}


/// Set core affinity
/// Set real-time kernel
/// Priority (1-99, higher is more prioritized); Input -1 to disable
/// 设置绑核
/// 设置实时内核
/// 优先级(1-99,越高越优先); 输入 -1 时, 不开启
pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
    let ALL_CORES = CORES.clone();

    let core = ALL_CORES.iter().find(|&&core| core.id == core_u);

    let core = match core {
        Some(core) => core,
        None => match ALL_CORES.last() {
            Some(core) => core,
            None => return false,
        },
    };

    let b = core_affinity::set_for_current(core.clone());

    #[allow(unused_mut)]
    let mut b2 = true;

    #[allow(unused_mut)]
    let mut realtime_msg = "";

    static _IS_REALTIME_SYSTEM: LazyLock<bool> = LazyLock::new(|| {
        std::fs::read_to_string(FILE_CORE_AFFINITY)
            .unwrap_or_else(|_| "".to_string())
            .contains("realtime_system")
    });

    #[cfg(target_os = "linux")]
    if b && _realtime > 0 && *_IS_REALTIME_SYSTEM {
        use libc::{EINVAL, EPERM, ESRCH, SCHED_RR, sched_param, sched_setscheduler};
        let param = sched_param {
            sched_priority: _realtime,
        };
        unsafe {
            // SCHED_FIFO, SCHED_RR
            let ret = sched_setscheduler(0, SCHED_RR, &param);
            if ret != 0 {
                /*
                let errno = *libc::__errno_location();
                let err_msg2 = match errno {
                    EPERM => "权限不足 (EPERM) - 检查:\n1. getcap -v ./your_program 输出是否包含 cap_sys_nice\n2. 文件所有者是否正确\n3. 是否需要重新登录使组权限生效",
                    EINVAL => "参数无效 (EINVAL) - 可能原因:\n1. 优先级超出范围(1-99)\n2. 调度策略无效",
                    ESRCH => "指定的进程不存在 (ESRCH)",
                    _ => "未知错误",
                };
                realtime_msg = format!(
                    "set_realtime: false, 设置实时内核调度器失败: errno={errno}, UID={}, EUID={}, 设置优先级={}, 错误信息={}",
                    libc::getuid(),
                    libc::geteuid(),
                    _realtime,
                    err_msg2
                );
                */
                realtime_msg = ", set_realtime: false";
                b2 = false;
            } else {
                realtime_msg = ", set_realtime: true";
            }
        }
    }

    debug!("thread core: {core_u}, set_core_affinity: {b}{realtime_msg}");
    b && b2
}

/// Thread pool
/// Manage threads through a thread pool
/// The default number of threads in this thread pool is 1/4 of the CPU cores; tasks are submitted to the default thread pool by default;
/// Enable exclusive high-performance mode through an API; In exclusive high-performance mode, the number of threads in the thread pool is at most 1/5 of the CPU cores. For example, for a 128-core CPU, the thread pool has at most 25 threads;
/// threads_high_performance_mode: By checking the number of tasks in the current thread, if the number of tasks within 10 milliseconds is the least among other threads, the task is assigned to that thread;
/// 线程池
/// 通过线程池来管理线程
/// 此线程池的线程个数默认为cpu核心数的4分之一; 任务默认提交在默认线程池;
/// 通过一个api开启独享高性能模式; 独享高性能模式下,线程池的线程个数最多为cpu核心数的5分之1, 比如128核的cpu, 线程池的线程个数最多为25个;
/// theads_高性能模式: 通过查看当前线程的任务数, 如果任务数10毫秒内任务数是其它线程中最少的, 则将任务分配给该线程;
/// 使用 core_affinity 获得cpu核心数
/// 如果已经有一个股票的任务在一个线程中执行, 则将任务分配给该线程; 如果该股票的任务全部执行完毕, 则将任务分配给任务数最少的线程;
pub struct ThreadPool {
    pub threads_share: Vec<TaskExecutor>,
    pub threads_fast: crate::vec::SyncVec<TaskExecutor>,

    pub switch_thread_index: Mutex<i32>,

    // pub 高性能模式_任务数最少的线程: AtomicCell<usize>,
    /// 记录高性能模式下, 当前股票代码在哪个线程中执行
    pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,

    /// 记录高性能模式下, 当前股票代码正在执行的任务数
    // 高性能模式_记录_任务数: StockPool<Arc<()>>,

    /// 防止同一支票并行运行
    pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
    // 多少个核心
    // core_num: Vec<core_affinity::CoreId>,
    // current_core: AtomicCell<i32>,
}

// pub struct Pto<T: Debug + Default>(AtomicCell<T>);
// impl<T: Debug + Default> Deref for Pto<T> {
//     type Target = AtomicCell<T>;
//     fn deref(&self) -> &Self::Target {
//         &self.0
//     }
// }
// impl<T: Debug + Default + Copy> Debug for Pto<T> {
//     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
//         f.write_str(&format!("{:?}", self.load()))
//     }
// }
// impl<T: Debug + Default> Default for Pto<T> {
//     fn default() -> Self {
//         Pto(AtomicCell::new(T::default()))
//     }
// }

impl ThreadPool {
    pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
        let cpu_num = CORES.len();

        let skip = get_core_skip();
        let cpu_num = cpu_num / skip;
        let mut cpu_fraction_share_num = cpu_num / cpu_fraction_share;

        // 修复: 确保线程数至少为1
        if cpu_fraction_share_num == 0 {
            cpu_fraction_share_num = 1;
            warn!(
                "cpu_fraction_share_num({cpu_fraction_share_num}) = cpu_num({cpu_num}) / cpu_fraction_share({cpu_fraction_share}), set cpu_fraction_share_num to 1"
            );
        }

        // 使用 use_last_core2 获取共享线程使用的核心
        debug!("threads_share cpu count: {cpu_fraction_share_num}, skip: {}", get_core_skip());
        let mut threads_共享 = Vec::with_capacity(cpu_fraction_share_num);
        
        // 获取共享线程使用的核心
        let share_cores = super::use_last_core2("share_thread", cpu_fraction_share_num);
        
        // 创建共享线程
        for core_id in &share_cores {
            let core = core_affinity::CoreId { id: *core_id };
            threads_共享.push(TaskExecutor::new(core, -1));
        }
        
        // 创建快速线程
        if cpu_fraction_fast == 0 {
            cpu_fraction_fast = cpu_num;
        }
        let mut cpu_fraction_fast_num = cpu_num / cpu_fraction_fast;
        if cpu_fraction_fast_num == 0 {
            cpu_fraction_fast_num = 1;
            warn!(
                "cpu_fraction_fast_num({cpu_fraction_fast_num}) = cpu_num({cpu_num}) / cpu_fraction_fast({cpu_fraction_fast}), set cpu_fraction_fast_num to 1"
            );
        }
        debug!("theads_fraction_fast cpu count: {}", cpu_fraction_fast_num);
        let theads_高性能模式 = crate::vec::SyncVec::with_capacity(cpu_fraction_fast_num);

        // 获取快速线程使用的核心
        let fast_cores = super::use_last_core2("fast_thread", cpu_fraction_fast_num);
        
        // 创建快速线程
        for core_id in &fast_cores {
            let core = core_affinity::CoreId { id: *core_id };
            theads_高性能模式.push(TaskExecutor::new(core, 49));
        }
        
        debug!("fast_thread_pool_bind_cores: share cores: {:?}, fast cores: {:?}", share_cores, fast_cores);

        let r = ThreadPool {
            threads_share: threads_共享,
            threads_fast: theads_高性能模式,
            // 高性能模式_任务数最少的线程: 0.into(),
            threads_fast_idx: StockPool::new_default(),
            // 高性能模式_记录_任务数: StockPool::new(),
            stock_lock: StockPool::new_default(),
            switch_thread_index: (-1).into(),
            // core_num,
            // current_core: -1.into(),
        };

        // let r1 = r.clone();
        // std::thread::spawn(move || loop {
        //     r1.loop_任务数最少的线程();
        //     std::thread::sleep(std::time::Duration::from_millis(10));
        // });
        r
    }

    /// 获取拥有最少任务的线程索引
    ///
    /// 本函数遍历快速线程池中的线程,寻找任务计数最少的线程。
    /// 如果找到一个任务计数为0的线程,它将立即返回该线程的索引,
    /// 因为这表示该线程目前没有任务。否则,函数将返回任务计数最少的线程索引。
    /// 这个信息用于调度新任务到拥有最少任务的线程,以平衡线程间的工作负载。
    /*
    pub fn count_task_min(&self) -> usize {
        // 初始化最小任务计数为第一个线程的任务计数,最小索引为0
        let mut min_count = self.theads_fast[0].count.load();
        let mut min_index = 0;

        // 遍历快速线程池中的线程
        for (i, thread) in self.theads_fast.iter().enumerate() {
            let count = thread.count.load();

            // 如果找到一个任务计数为0的线程,立即返回其索引
            if count == 0 {
                min_index = i;
                break;
            }

            // 如果当前线程的任务计数少于最小任务计数,更新最小任务计数和索引
            if count < min_count {
                min_count = count;
                min_index = i;
            }
        }

        // 返回任务计数最少的线程索引
        min_index
    }
    */

    /// 获取下一个要切换到的线程的索引,以实现线程之间的任务平衡。
    ///
    /// 这个方法通过循环的方式选择下一个线程索引,以确保任务能够均匀地分配给每个线程。
    /// 它使用了一个互斥锁来保证在多线程环境下对索引的访问是安全的。
    ///
    /// # 返回值
    /// 根据当前线程池的状态,计算并返回下一个应该执行任务的线程索引。
    /// 这个方法旨在平衡线程间的任务分配,避免某个线程过载而其他线程闲置的情况。
    ///
    /// 参数 `i7` 作为一个辅助的计算参数,用于在无法立即获得锁时决定返回哪个线程索引。
    ///
    /// 返回值是一个枚举 `IdxStatus`,它可以指示线程应该记住当前计算出的索引(`Remember`),
    /// 或者由于锁的竞争失败而丢弃当前的计算并使用另一个索引(`Discard`)。
    /// 返回当前选择的线程索引。
    pub fn count_task_min(&self, i7: i32) -> IdxStatus {
        // 获取线程池中线程的数量,用于后续计算下一个任务线程的索引。
        // 获取线程池中线程的数量
        let len = self.threads_fast.len();

        // 尝试获取用于控制任务分配的索引互斥锁,如果无法立即获得锁,则根据 `i7` 返回一个备选索引。
        // 获取用于控制线程切换的索引的互斥锁
        let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
            Some(mutex) => mutex,
            None => return IdxStatus::Discard(i7 as usize % len),
        };

        // 如果当前索引为0,将其设置为最大值,否则递减索引,以实现循环分配策略。
        // 这样做是为了实现循环访问,避免索引越界
        if *min_index == 0 || *min_index == -1 {
            *min_index = len as i32 - 1;
        } else {
            *min_index -= 1;
        }

        // 返回之前复制的索引值,指示线程应该记住这个索引以供下次使用。
        // 返回复制的索引值
        let r = *min_index as usize;
        self.threads_fast_idx[i7].store(Some(r));
        IdxStatus::Remember(r)
    }

    #[inline(always)]
    pub fn spawn<F>(&self, i7: i32, f: F)
    where
        F: FnOnce(),
        F: Send + 'static,
    {
        let _lock = self.stock_lock[i7].clone();
        let index = i7 % self.threads_share.len() as i32;
        self.threads_share[index as usize].spawn(move |_core| {
            let _lock = _lock.lock();
            // #[cfg(debug_assertions)]
            // print!("高性能模式, 共享线程({i7}): {index} ");
            f();
            drop(_lock);
        });
    }

    #[inline(always)]
    pub fn spawn_fast<F>(&self, i7: i32, f: F)
    where
        F: FnOnce(),
        F: Send + 'static,
    {
        let mut on_fast_idx = -1;

        // 找最少任务数的线程
        #[cfg(not(feature = "thread_dispatch"))]
        let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
            let min = self.count_task_min(i7);
            let idx = min.get_idx();
            if let IdxStatus::Remember(idx) = &min {
                on_fast_idx = *idx as i32;
            }
            idx
        });

        // 如果当前任务堆积小于5个, 则使用当前线程; 否则就去找最少任务数的线程
        // 有任务调度的线程方法, 如果有任务堆积则通过找任务数最少的线程来提交任务
        #[cfg(feature = "thread_dispatch")]
        let thread_idx = match self.threads_fast_idx[i7].load() {
            Some(i) if self.threads_fast.get_uncheck(i).count.load() < 1000 => i,
            _ => {
                let min = self.count_task_min(i7);
                let idx = min.get_idx();
                if let IdxStatus::Remember(idx) = &min {
                    on_fast_idx = *idx as i32;
                }
                idx
            }
        };

        // 提交任务
        let lock = self.stock_lock[i7].clone();
        self.threads_fast.get_uncheck(thread_idx).spawn(move |_core| {
            let lock_v = lock.lock();
            // print!(" {i7} theads_fast: {thread_idx} ");
            f();
            drop(lock_v);
            if on_fast_idx != -1 {
                // debug!("on_fast thread; i7: {i7}, cpu: {_core}");
            }
        });
    }

    #[inline(always)]
    pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
    where
        F: FnOnce(),
        F: Send + 'static,
    {
        if is_fast {
            self.spawn_fast(i7, f);
        } else {
            self.spawn(i7, f);
        }
    }
}

pub enum IdxStatus {
    // 记住索引
    Remember(usize),
    // 丢弃索引
    Discard(usize),
}

impl IdxStatus {
    pub fn get_idx(&self) -> usize {
        match self {
            IdxStatus::Remember(idx) => *idx,
            IdxStatus::Discard(idx) => *idx,
        }
    }
}

#[test]
fn _test_pool() {
    use crate::fast_thread_pool::*;
    
    unsafe { std::env::set_var("RUST_LOG", "debug") };
    env_logger::init();

    init(true);
    // let _core = use_last_core2("use_last_core_多个核心", 40); 
    let _core = use_last_core("use_last_core"); 
    // let _core = use_last_core2("use_last_core_多个核心", 5); 
    info!("use_last_core_多个核心: {:?}", _core);
    // let _core2 = use_last_core();
    // let _core = use_last_core();
    // let _lite1 = super::ThreadPoolLite::new(); // TODO: 1
    // let num: super::ThreadPoolConstNum<5> = const_num::ThreadPoolConstNum::new(); // TODO: 3
    let pool = ThreadPool::new(2, 4); // TODO: 4
    let _core = use_last_core2("use_last_core_多个核心2", 5); 
    let _core2 = use_last_core("use_last_core2"); 
    let _core3 = use_last_core("use_last_core3"); 
    let _core4 = use_last_core("use_last_core4"); 
    let _core5 = use_last_core("use_last_core5");
    let _core6 = use_last_core("use_last_core6");
    std::thread::sleep(std::time::Duration::from_millis(200));

    let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
    let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
    let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));

    let count_c = count.clone();
    let elapsed_total_c = elapsed_total.clone();
    let elapsed_exp_c = elapsed_exp.clone();
    std::thread::spawn(move || {
        loop {
            std::thread::sleep(std::time::Duration::from_secs(3));
            let count = count_c.fetch_and(0);
            let elapsed_total = elapsed_total_c.fetch_and(0);
            let elapsed_exp = elapsed_exp_c.fetch_and(0);
            info!(
                "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
                count,
                elapsed_total,
                elapsed_total / count,
                elapsed_exp,
                elapsed_exp as f64 / count as f64 * 10000.0,
            );
        }
    });

    loop {
        for i in 0..100 {
            // std::thread::sleep(std::time::Duration::from_micros(i % 50));
            let time_hs = std::time::Instant::now();
            let count = count.clone();
            let elapsed_total = elapsed_total.clone();
            let elapsed_exp = elapsed_exp.clone();
            // spin::Barrier::new(i % 10).wait();
            // spin::relax::Loop::(Duration::from_micros(i % 50));
            let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
            pool.spawn_is_fast(i7, true, move || {
                let micros = time_hs.elapsed().as_micros();
                count.fetch_add(1);
                elapsed_total.fetch_add(micros as i64);
                if micros > 100 {
                    elapsed_exp.fetch_add(1);
                }
            });
        }
        std::thread::sleep(std::time::Duration::from_micros(110));
    }
    std::thread::sleep(std::time::Duration::from_secs(9999));
}