fast_able/
cache.rs

1use std::{sync::Arc, thread::JoinHandle, time::Duration};
2
3use crossbeam::channel::{bounded, Receiver};
4
5// use kanal::{bounded, Receiver};
6
7use crate::vec::{ReadGuard, SyncVec};
8
9/// Pre-generated cache package
10/// 提前生成的缓存包
11pub struct CachePackage<T: Default> {
12    list: Arc<SyncVec<T>>,
13    receiver_index: Receiver<usize>,
14    _thread_join: JoinHandle<()>,
15}
16
17impl<T: Default + 'static> CachePackage<T> {
18    /// Create a new cache package
19    /// 创建一个新的缓存包
20    pub fn new(cap: usize, mem_size: usize) -> Self {
21        let list = Arc::new(SyncVec::new());
22        let list_c = list.clone();
23        let (tx, rx) = bounded(cap);
24        // let (tx, rx) = crossbeam::channel::bounded(cap);
25        if mem_size < 1024 {
26            debug!("cache_package mem_size: {} bt", mem_size);
27        } else if mem_size < 1024 * 1024 {
28            debug!("cache_package mem_size: {:.2} kb", mem_size as f64 / 1024.0);
29        } else {
30            debug!(
31                "cache_package mem_size: {:.2} mb",
32                mem_size as f64 / 1024.0 / 1024.0
33            );
34        }
35        let _thread_join = std::thread::Builder::new()
36            .stack_size(mem_size)
37            .spawn(move || loop {
38                let i = list_c.push_return(T::default());
39                if let Err(_e) = tx.send(i) {
40                    error!("cache_package tx.send error[{i}]: {_e}");
41                    #[cfg(debug_assertions)]
42                    println!("cache_package tx.send error[{i}]: {_e}");
43                    std::thread::sleep(std::time::Duration::from_secs(1));
44                }
45            })
46            .unwrap();
47        Self { list, receiver_index: rx, _thread_join }
48    }
49
50    /// Get the next item from the cache
51    /// 从缓存中获取下一个项目
52    #[inline(always)]
53    pub fn next(&self) -> ReadGuard<'_, T> {
54        let i = self.receiver_index.recv().unwrap_or_else(|_e| {
55            let i = self.list.push_return(T::default());
56            warn!("cache_package.try_recv error[{i}]: {_e}");
57            i
58        });
59        self.list.get_uncheck(i)
60    }
61}
62
63/*
64mod test {
65
66    /// 提前生成的缓存,在使用申请缓存即可, 必竟要生成一个3千元素的数组需要100微秒
67    /// 提前生成10个
68    /// 使用: 行情_缓存.try_recv()
69    static 行情_缓存: StaticType<CachePackage<List_高频数据<QTY_前后封单_行情端, 3000>>> = StaticType::new();
70
71    pub fn init_static() -> IResult {
72        行情_缓存.init_call(|| CachePackage::new(3001));
73        Ok(())
74    }
75
76    #[test]
77    fn test_static_data() {
78        std::thread::Builder::new()
79            .stack_size(1024 * 1204 * 1024)
80            .spawn(|| {
81                init_static().unwrap();
82                std::thread::sleep(Duration::from_secs(3));
83
84                let count = 2000;
85                let mut time_elapse = Duration::from_micros(0);
86
87                for _ in 0..count {
88                    let start = std::time::Instant::now();
89                    let list = 行情_缓存.next();
90                    list.push(Default::default());
91                    list.push(Default::default());
92                    time_elapse += start.elapsed();
93                }
94
95                println!("计算次数 {count}, 平均耗时: {:?}", time_elapse / count);
96            })
97            .unwrap()
98            .join()
99            .unwrap();
100    }
101}
102*/