fast_able/
high_speed_cache.rs

1// 高频数据
2
3use crossbeam::atomic::AtomicCell;
4use crate::unsafe_cell_type::U;
5use std::ops::Deref;
6use std::{default::Default, usize};
7use chrono::NaiveTime;
8use crate::fasttime::b2_time::TimeExt;
9use crate::fasttime::a8_micros;
10
11pub trait CacheData: Default + Clone {
12    fn time(&self) -> NaiveTime;
13}
14
15pub struct HighSpeedCache<T, const LEN: usize> {
16    item: U<UnsafeData<T, LEN>>,
17}
18
19impl<T, const LEN: usize> Deref for HighSpeedCache<T, LEN> {
20    type Target = UnsafeData<T, LEN>;
21    fn deref(&self) -> &Self::Target {
22        self.item.as_ref()
23    }
24}
25
26pub struct UnsafeData<T, const LEN: usize> {
27    items: [T; LEN],
28    index: usize,
29    defalut_val: T,
30    pub count: usize,
31    待补数据: AtomicCell<Option<(usize, T)>>,
32}
33
34impl<T, const LEN: usize> HighSpeedCache<T, LEN>
35where
36    T: CacheData,
37{
38    pub fn new() -> Self {
39        // TODO: 生成 [T; LEN]
40        // 当前报错: error[E0277]: the trait bound `T: std::marker::Copy` is not satisfied
41        // 请在不实现 copy 的情况下, 帮我修改这里的代码, 让它编译通过
42        let items: [T; LEN] = std::array::from_fn(|_i| T::default());
43        let item = UnsafeData {
44            items,
45            index: 0,
46            count: 0,
47            defalut_val: T::default(),
48            待补数据: AtomicCell::new(None),
49        };
50        Self { item: item.into() }
51    }
52
53    fn item_mut(&self) -> &mut UnsafeData<T, LEN> {
54        self.item.as_mut()
55    }
56
57    /// 添加1项
58    /// 请控制此方法不可并发访问
59    pub fn push(&self, item_new: T) {
60        let self_item = self.item_mut();
61        if let Some((i, d)) = self.待补数据.swap(None) {
62            self_item.items[i] = d;
63        }
64
65        if self_item.count == 0 {
66            // 此时 index = 0, 对index不操作
67        } else {
68            self_item.index += 1;
69            if self_item.index == LEN {
70                self_item.index = 0;
71            }
72        }
73        self_item.items[self.index] = item_new;
74        self_item.count += 1;
75    }
76
77    /// 当前项
78    pub fn current(&self) -> &T {
79        if self.count == 0 {
80            return &self.item.as_ref().defalut_val;
81        }
82        &self.items[self.index]
83    }
84
85    /// 请控制此方法不可并发访问
86    pub fn clear(&self) {
87        let self_item = self.item_mut();
88        self_item.index = 0;
89        self_item.count = 0;
90        // 第1条数据要清空,并无实质数据。
91        self_item.items[0] = T::default();
92    }
93
94    /// 传参:毫秒
95    /// 如果找不到精确定位的数据, 则返回最新的那条数据
96    pub fn find_找之前的项(&self, 毫秒数: i64, is_找不到即取最新数据: bool) -> Option<&T> {
97        if self.count == 0 {
98            return None;
99        }
100        let capacity = self.items.len();
101        // 将微秒数转为 u64,但只在非负当前情况才如此
102        let 微秒数 = if 毫秒数 >= 0 {
103            毫秒数 as u64 * a8_micros::MICROS_PER_MILLIS
104        } else {
105            0
106        };
107
108        let ticks_当前项 = self.items[self.index].time().micros_of_day();
109        let index_当前索引 = self.index;
110
111        let mut find_最新 = Option::<&T>::None;
112        for idx in (0..index_当前索引).rev() {
113            let ticks_往前项 = self.items[idx].time().micros_of_day();
114            if ticks_往前项 == 0 {
115                continue;
116            }
117            let delta_往前微秒数 = ticks_当前项 - ticks_往前项;
118            if delta_往前微秒数 > 微秒数 {
119                return Some(&self.items[idx]);
120            }
121
122            // 修复: 如果找不到精确定位的数据, 则返回最新的那条数据; 从vec后面开始循环, 第一条数据就是最新的
123            if is_找不到即取最新数据 {
124                match &find_最新 {
125                    None => {
126                        find_最新 = Some(&self.items[idx]);
127                    }
128                    _ => (),
129                }
130            }
131        }
132
133        if self.count <= LEN {
134            // 尚未走出一轮(填满 CAPACTIY 个项),后面没有值
135            return Some(find_最新.unwrap_or_else(|| &self.item.as_ref().defalut_val));
136        }
137
138        // 从数组后面开始找
139        for idx in ((index_当前索引 + 1)..capacity).rev() {
140            let item = &self.items[idx];
141            let ticks_往前项 = item.time().micros_of_day();
142            let delta_往前微秒数 = ticks_当前项 - ticks_往前项;
143            if ticks_往前项 > 0 && delta_往前微秒数 > 微秒数 {
144                return Some(item);
145
146                // 修复: 找前封单时刚好在前一段找不到数据, 再从后面开始找, 匹配到最后一个数据, 但却拿到第一个下标的初始值
147                // if idx + 1 == capacity {
148                //     return &self.items[0];
149                // } else {
150                //     return item;
151                // }
152            }
153        }
154
155        return find_最新;
156    }
157
158    /// 如果十档或者前封单数据如果出现断档的情况,那么把前一个数据的时间戳改成现在数据并且减10毫秒
159    /// 下次找数据时可以直接找到这一条
160    // fn 向前补一个数据(&self) -> Option<T> {
161    //     let idx = if self.index == 0 && self.count >= LEN {
162    //         LEN - 1
163    //     } else if self.index > 0 {
164    //         self.index - 1
165    //     } else {
166    //         // index 等于零, 没有数据的情况
167    //         return None;
168    //     };
169
170    //     let mut item = self.items[idx].clone();
171    //     let millis = self.current().time().micros_of_day() - 10 * 1000;
172    //     let time = NaiveTime::from_micros_day_unsafe(millis);
173    //     item.set_time_发生时间(time);
174    //     self.待补数据.store(Some((idx, item.clone())));
175    //     return Some(item);
176    // }
177
178    pub fn find<F: Fn(&T) -> bool>(&self, f: F) -> Option<&T> {
179        if self.count == 0 {
180            return None;
181        }
182
183        let 当前项 = &self.items[self.index];
184        if f(当前项) {
185            return Some(当前项);
186        }
187
188        let index_当前索引 = self.index;
189        for idx in (0..index_当前索引).rev() {
190            if f(&self.items[idx]) {
191                return Some(&self.items[idx]);
192            }
193        }
194        if self.count <= LEN {
195            // 尚未走出一轮(填满 CAPACTIY 个项),后面没有值
196            return None;
197        }
198
199        for idx in ((index_当前索引 + 1)..self.items.len()).rev() {
200            if f(&self.items[idx]) {
201                return Some(&self.items[idx]);
202            }
203        }
204        None
205    }
206
207    /// 找到范围内最大的值
208    /// n: 毫秒数
209    /// is_return_last: true; 两秒内最大值找不到, 即找一个最新的数据
210    /// compare: 比较两个值, 返回true则使用新的值
211    /// 
212    /// TODO: 不要使用补数据的逻辑, 找不到数据即取一个最新的数据
213    pub fn find_limit_max<F: Fn(&T, &T) -> bool>(
214        &self,
215        n: i64,
216        is_return_last: bool,
217        compare: F,
218    ) -> T {
219        let items = self.list_items(n);
220        let 补一个数据 = None;
221
222        if (items.is_empty() || items.len() == 1) && is_return_last {
223            // TODO: 注释掉不存在的方法调用
224            // 补一个数据 = self.向前补一个数据();
225        }
226
227        if items.is_empty() {
228            return 补一个数据.unwrap_or_else(|| self.defalut_val.clone());
229        }
230
231        let mut find_v = items[0];
232        for item in items.iter().skip(1) {
233            if compare(&find_v, item) {
234                find_v = item;
235            }
236        }
237
238        if let Some(v) = &补一个数据 {
239            if compare(&find_v, v) {
240                find_v = v;
241            }
242        }
243
244        return find_v.clone();
245    }
246
247    /// 传参:找多少个
248    pub fn list_items_len(&self, mut count: usize) -> Vec<&T> {
249        if self.count == 0 {
250            return vec![];
251        }
252        let mut items = vec![];
253
254        let 当前项 = &self.items[self.index];
255        items.push(当前项);
256
257        let index_当前索引 = self.index;
258        for idx in (0..index_当前索引).rev() {
259            if count == 0 {
260                return items;
261            }
262            items.push(&self.items[idx]);
263            count -= 1;
264        }
265
266        if self.count <= LEN {
267            // 尚未走出一轮(填满 CAPACTIY 个项),后面没有值
268            return items;
269        }
270
271        for idx in ((index_当前索引 + 1)..self.items.len()).rev() {
272            if count == 0 {
273                return items;
274            }
275            items.push(&self.items[idx]);
276            count -= 1;
277        }
278        items
279    }
280
281    /// 传参:毫秒
282    pub fn list_items(&self, 毫秒数: i64) -> Vec<&T> {
283        if self.count == 0 {
284            return vec![];
285        }
286        let mut items = vec![];
287
288        // 将微秒数转为 u64,但只在非负当前情况才如此
289        let 微秒数 = if 毫秒数 as i64 >= 0 {
290            毫秒数 as u64 * a8_micros::MICROS_PER_MILLIS
291        } else {
292            0
293        };
294
295        let 当前项 = &self.items[self.index];
296        items.push(当前项);
297
298        let ticks_当前项 = 当前项.time().micros_of_day();
299        let index_当前索引 = self.index;
300        for idx in (0..index_当前索引).rev() {
301            let ticks_往前项 = self.items[idx].time().micros_of_day();
302            let delta_往前微秒数 = ticks_当前项 - ticks_往前项;
303            if delta_往前微秒数 > 微秒数 {
304                // return items;
305                break;
306            }
307            items.push(&self.items[idx]);
308        }
309        if self.count <= LEN {
310            // 尚未走出一轮(填满 CAPACTIY 个项),后面没有值
311            return items;
312        }
313
314        for idx in ((index_当前索引 + 1)..self.items.len()).rev() {
315            let ticks_往前项 = self.items[idx].time().micros_of_day();
316            let delta_往前微秒数 = ticks_当前项 - ticks_往前项;
317            if delta_往前微秒数 > 微秒数 {
318                return items;
319            }
320            // 是不是少了这句代码? todo 志松
321            items.push(&self.items[idx]);
322        }
323        items
324    }
325
326    // 补数据, 十档数据有可能在N秒后才有新数据, 逐笔同理, 把之前缺失的数据补上(10毫秒一个)
327    /* pub fn update_补数据(&mut self, last: TimeFT) {
328        let last_ticks = last.micros_of_day();
329
330        if self.current().time().micros_of_day() + 10 >= last_ticks {
331            return;
332        }
333
334        let mut 最新时间的数据 = Option::<T>::None;
335        for ele in &self.items {
336            match 最新时间的数据{
337                Some(v) => {
338                    if v.time().micros_of_day() <= ele.time().micros_of_day() {
339                        最新时间的数据 = Some(ele.clone());
340                    }
341                },
342                None => {
343                    最新时间的数据 = Some(ele.clone());
344                }
345            }
346        }
347
348        let mut 内存_最新数据 = match 最新时间的数据 {
349            Some(v) => v,
350            None => return,
351        };
352
353        let mut 内存_最新数据时间 = 内存_最新数据.time().micros_of_day();
354
355        while last_ticks > 内存_最新数据时间 + 10{
356            内存_最新数据时间 += 10;
357            内存_最新数据.set_接收时间(TimeFT::from_micros_day_unsafe(内存_最新数据时间));
358            self.push(&内存_最新数据);
359        }
360
361    } */
362}
363
364impl<T: CacheData, const LEN: usize> Default for HighSpeedCache<T, LEN> {
365    fn default() -> Self {
366        Self::new()
367    }
368}
369
370#[cfg(test)]
371mod test_高频数据 {
372    use chrono::NaiveTime;
373    use crate::fasttime::b2_time::TimeExt;
374
375    use super::{CacheData, HighSpeedCache};
376
377    #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
378    struct Demo_高频数据 {
379        pub time_发生时间: NaiveTime,
380        pub time_接收时间: NaiveTime,
381        pub val: i32,
382    }
383
384    impl CacheData for Demo_高频数据 {
385        fn time(&self) -> NaiveTime {
386            self.time_发生时间
387        }
388    }
389
390    #[test]
391    fn test_demo_高频数据() {
392        let HighSpeedCache = HighSpeedCache::<Demo_高频数据, 1000>::new();
393
394        // 使用固定增量代替随机数,避免依赖 rand
395        let mut micros = NaiveTime::from_hmsi_friendly_unsafe(93000_000).micros_of_day();
396        for val in 1..1000 {
397            let time_发生时间 = NaiveTime::from_micros_day_unsafe(micros);
398            micros += 100; // 固定增加100微秒
399
400            HighSpeedCache.push(Demo_高频数据 {
401                time_发生时间,
402                time_接收时间: time_发生时间,
403                val,
404            });
405            _ = HighSpeedCache.find_找之前的项(30_000, false);
406        }
407    }
408}