atm0s_sdn/
history.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    sync::atomic::AtomicU64,
4};
5
6use atm0s_sdn_identity::NodeId;
7use atm0s_sdn_router::shadow::ShadowRouterHistory;
8use parking_lot::Mutex;
9
10const HISTORY_TIMEOUT_MS: u64 = 2000;
11
12#[derive(Debug, Default)]
13pub struct DataWorkerHistory {
14    now_ms: AtomicU64,
15    #[allow(clippy::type_complexity)]
16    queue: Mutex<VecDeque<(u64, (Option<NodeId>, u8, u16))>>,
17    #[allow(clippy::type_complexity)]
18    map: Mutex<HashMap<(Option<NodeId>, u8, u16), bool>>,
19}
20
21impl ShadowRouterHistory for DataWorkerHistory {
22    fn already_received_broadcast(&self, from: Option<NodeId>, service: u8, seq: u16) -> bool {
23        let mut map = self.map.lock();
24        let mut queue = self.queue.lock();
25        let now_ms = self.now_ms.load(std::sync::atomic::Ordering::Relaxed);
26        if map.contains_key(&(from, service, seq)) {
27            return true;
28        }
29
30        map.insert((from, service, seq), true);
31        queue.push_back((now_ms, (from, service, seq)));
32        if queue.len() > 10000 {
33            let (_ts, pair) = queue.pop_front().expect("queue should not empty");
34            map.remove(&pair);
35        }
36        false
37    }
38
39    fn set_ts(&self, now_ms: u64) {
40        self.now_ms.store(now_ms, std::sync::atomic::Ordering::Relaxed);
41        let mut map = self.map.lock();
42        let mut queue = self.queue.lock();
43
44        while let Some((time, key)) = queue.front() {
45            if now_ms >= *time + HISTORY_TIMEOUT_MS {
46                map.remove(key);
47                queue.pop_front();
48            } else {
49                break;
50            }
51        }
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use atm0s_sdn_router::shadow::ShadowRouterHistory;
58
59    use crate::history::HISTORY_TIMEOUT_MS;
60
61    use super::DataWorkerHistory;
62
63    #[test]
64    fn simple_work() {
65        let history = DataWorkerHistory::default();
66
67        assert_eq!(history.already_received_broadcast(Some(1), 1, 1), false);
68        assert_eq!(history.already_received_broadcast(Some(1), 1, 1), true);
69
70        //after timeout
71        history.set_ts(HISTORY_TIMEOUT_MS);
72        assert_eq!(history.already_received_broadcast(Some(1), 1, 1), false);
73    }
74}