fast_able/cache.rs
1use std::{sync::Arc, thread::JoinHandle, time::Duration};
2
3use crossbeam::channel::{bounded, Receiver};
4
5// use kanal::{bounded, Receiver};
6
7use crate::vec::{ReadGuard, SyncVec};
8
9/// Pre-generated cache package
10/// 提前生成的缓存包
11pub struct CachePackage<T: Default> {
12 list: Arc<SyncVec<T>>,
13 receiver_index: Receiver<usize>,
14 _thread_join: JoinHandle<()>,
15}
16
17impl<T: Default + 'static> CachePackage<T> {
18 /// Create a new cache package
19 /// 创建一个新的缓存包
20 pub fn new(cap: usize, mem_size: usize) -> Self {
21 let list = Arc::new(SyncVec::new());
22 let list_c = list.clone();
23 let (tx, rx) = bounded(cap);
24 // let (tx, rx) = crossbeam::channel::bounded(cap);
25 if mem_size < 1024 {
26 debug!("cache_package mem_size: {} bt", mem_size);
27 } else if mem_size < 1024 * 1024 {
28 debug!("cache_package mem_size: {:.2} kb", mem_size as f64 / 1024.0);
29 } else {
30 debug!(
31 "cache_package mem_size: {:.2} mb",
32 mem_size as f64 / 1024.0 / 1024.0
33 );
34 }
35 let _thread_join = std::thread::Builder::new()
36 .stack_size(mem_size)
37 .spawn(move || loop {
38 let i = list_c.push_return(T::default());
39 if let Err(_e) = tx.send(i) {
40 error!("cache_package tx.send error[{i}]: {_e}");
41 #[cfg(debug_assertions)]
42 println!("cache_package tx.send error[{i}]: {_e}");
43 std::thread::sleep(std::time::Duration::from_secs(1));
44 }
45 })
46 .unwrap();
47 Self { list, receiver_index: rx, _thread_join }
48 }
49
50 /// Get the next item from the cache
51 /// 从缓存中获取下一个项目
52 #[inline(always)]
53 pub fn next(&self) -> ReadGuard<'_, T> {
54 let i = self.receiver_index.recv().unwrap_or_else(|_e| {
55 let i = self.list.push_return(T::default());
56 warn!("cache_package.try_recv error[{i}]: {_e}");
57 i
58 });
59 self.list.get_uncheck(i)
60 }
61}
62
63/*
64mod test {
65
66 /// 提前生成的缓存,在使用申请缓存即可, 必竟要生成一个3千元素的数组需要100微秒
67 /// 提前生成10个
68 /// 使用: 行情_缓存.try_recv()
69 static 行情_缓存: StaticType<CachePackage<List_高频数据<QTY_前后封单_行情端, 3000>>> = StaticType::new();
70
71 pub fn init_static() -> IResult {
72 行情_缓存.init_call(|| CachePackage::new(3001));
73 Ok(())
74 }
75
76 #[test]
77 fn test_static_data() {
78 std::thread::Builder::new()
79 .stack_size(1024 * 1204 * 1024)
80 .spawn(|| {
81 init_static().unwrap();
82 std::thread::sleep(Duration::from_secs(3));
83
84 let count = 2000;
85 let mut time_elapse = Duration::from_micros(0);
86
87 for _ in 0..count {
88 let start = std::time::Instant::now();
89 let list = 行情_缓存.next();
90 list.push(Default::default());
91 list.push(Default::default());
92 time_elapse += start.elapsed();
93 }
94
95 println!("计算次数 {count}, 平均耗时: {:?}", time_elapse / count);
96 })
97 .unwrap()
98 .join()
99 .unwrap();
100 }
101}
102*/