fast_able/
cache.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::{sync::Arc, thread::JoinHandle, time::Duration};

use crossbeam::channel::{Receiver, bounded};

// use kanal::{bounded, Receiver};

use crate::vec::{ReadGuard, SyncVec};

pub struct CachePackage<T: Default> {
    list: Arc<SyncVec<T>>,
    receiver_index: Receiver<usize>,
    _thread_join: JoinHandle<()>,
}

impl<T: Default + 'static> CachePackage<T> {
    pub fn new(cap: usize, mem_size: usize) -> Self {
        let list = Arc::new(SyncVec::new());
        let list_c = list.clone();
        let (tx, rx) = bounded(cap);
        // let (tx, rx) = crossbeam::channel::bounded(cap);
        debug!("cache_package mem_size: {} kb", mem_size / 1024);
        let _thread_join = std::thread::Builder::new()
            .stack_size(mem_size)
            .spawn(move || loop {
                let v = T::default();
                let i = list_c.push_return(v);
                if let Err(_e) = tx.try_send(i) {
                    // error!("cache_package tx.send error: {_e}");
                    std::thread::sleep(std::time::Duration::from_secs(1));
                }
            })
            .unwrap();
        Self {
            list,
            receiver_index: rx,
            _thread_join,
        }
    }

    #[inline(always)]
    pub fn next(&self) -> ReadGuard<'_, T> {
        self.receiver_index
            .recv()
            .map(|i| self.list.get_uncheck(i))
            .unwrap_or_else(|_e| {
                warn!("cache_package.try_recv error: {_e}");
                let i = self.list.push_return(T::default());
                self.list.get_uncheck(i)
            })
    }
}

/*
mod test {

    /// 提前生成的缓存,在使用申请缓存即可, 必竟要生成一个3千元素的数组需要100微秒
    /// 提前生成10个
    /// 使用: 行情_缓存.try_recv()
    static 行情_缓存: StaticType<CachePackage<List_高频数据<QTY_前后封单_行情端, 3000>>> = StaticType::new();

    pub fn init_static() -> IResult {
        行情_缓存.init_call(|| CachePackage::new(3001));
        Ok(())
    }

    #[test]
    fn test_static_data() {
        std::thread::Builder::new()
            .stack_size(1024 * 1204 * 1024)
            .spawn(|| {
                init_static().unwrap();
                std::thread::sleep(Duration::from_secs(3));

                let count = 2000;
                let mut time_elapse = Duration::from_micros(0);

                for _ in 0..count {
                    let start = std::time::Instant::now();
                    let list = 行情_缓存.next();
                    list.push(Default::default());
                    list.push(Default::default());
                    time_elapse += start.elapsed();
                }

                println!("计算次数 {count}, 平均耗时: {:?}", time_elapse / count);
            })
            .unwrap()
            .join()
            .unwrap();
    }
}
*/