fast_able/
cache.rs

1use std::{sync::Arc, thread::JoinHandle, time::Duration};
2
3use crossbeam::channel::{bounded, Receiver};
4
5// use kanal::{bounded, Receiver};
6
7use crate::vec::{ReadGuard, SyncVec};
8
9pub struct CachePackage<T: Default> {
10    list: Arc<SyncVec<T>>,
11    receiver_index: Receiver<usize>,
12    _thread_join: JoinHandle<()>,
13}
14
15impl<T: Default + 'static> CachePackage<T> {
16    pub fn new(cap: usize, mem_size: usize) -> Self {
17        let list = Arc::new(SyncVec::new());
18        let list_c = list.clone();
19        let (tx, rx) = bounded(cap);
20        // let (tx, rx) = crossbeam::channel::bounded(cap);
21        if mem_size < 1024 {
22            debug!("cache_package mem_size: {} bt", mem_size);
23        } else if mem_size < 1024 * 1024 {
24            debug!("cache_package mem_size: {:.2} kb", mem_size as f64 / 1024.0);
25        } else {
26            debug!(
27                "cache_package mem_size: {:.2} mb",
28                mem_size as f64 / 1024.0 / 1024.0
29            );
30        }
31        let _thread_join = std::thread::Builder::new()
32            .stack_size(mem_size)
33            .spawn(move || loop {
34                let i = list_c.push_return(T::default());
35                if let Err(_e) = tx.send(i) {
36                    error!("cache_package tx.send error[{i}]: {_e}");
37                    #[cfg(debug_assertions)]
38                    println!("cache_package tx.send error[{i}]: {_e}");
39                    std::thread::sleep(std::time::Duration::from_secs(1));
40                }
41            })
42            .unwrap();
43        Self { list, receiver_index: rx, _thread_join }
44    }
45
46    #[inline(always)]
47    pub fn next(&self) -> ReadGuard<'_, T> {
48        let i = self.receiver_index.recv().unwrap_or_else(|_e| {
49            let i = self.list.push_return(T::default());
50            warn!("cache_package.try_recv error[{i}]: {_e}");
51            i
52        });
53        self.list.get_uncheck(i)
54    }
55}
56
57/*
58mod test {
59
60    /// 提前生成的缓存,在使用申请缓存即可, 必竟要生成一个3千元素的数组需要100微秒
61    /// 提前生成10个
62    /// 使用: 行情_缓存.try_recv()
63    static 行情_缓存: StaticType<CachePackage<List_高频数据<QTY_前后封单_行情端, 3000>>> = StaticType::new();
64
65    pub fn init_static() -> IResult {
66        行情_缓存.init_call(|| CachePackage::new(3001));
67        Ok(())
68    }
69
70    #[test]
71    fn test_static_data() {
72        std::thread::Builder::new()
73            .stack_size(1024 * 1204 * 1024)
74            .spawn(|| {
75                init_static().unwrap();
76                std::thread::sleep(Duration::from_secs(3));
77
78                let count = 2000;
79                let mut time_elapse = Duration::from_micros(0);
80
81                for _ in 0..count {
82                    let start = std::time::Instant::now();
83                    let list = 行情_缓存.next();
84                    list.push(Default::default());
85                    list.push(Default::default());
86                    time_elapse += start.elapsed();
87                }
88
89                println!("计算次数 {count}, 平均耗时: {:?}", time_elapse / count);
90            })
91            .unwrap()
92            .join()
93            .unwrap();
94    }
95}
96*/