1use std::{
2 collections::{HashMap, VecDeque},
3 sync::atomic::AtomicU64,
4};
5
6use atm0s_sdn_identity::NodeId;
7use atm0s_sdn_router::shadow::ShadowRouterHistory;
8use parking_lot::Mutex;
9
10const HISTORY_TIMEOUT_MS: u64 = 2000;
11
12#[derive(Debug, Default)]
13pub struct DataWorkerHistory {
14 now_ms: AtomicU64,
15 #[allow(clippy::type_complexity)]
16 queue: Mutex<VecDeque<(u64, (Option<NodeId>, u8, u16))>>,
17 #[allow(clippy::type_complexity)]
18 map: Mutex<HashMap<(Option<NodeId>, u8, u16), bool>>,
19}
20
21impl ShadowRouterHistory for DataWorkerHistory {
22 fn already_received_broadcast(&self, from: Option<NodeId>, service: u8, seq: u16) -> bool {
23 let mut map = self.map.lock();
24 let mut queue = self.queue.lock();
25 let now_ms = self.now_ms.load(std::sync::atomic::Ordering::Relaxed);
26 if map.contains_key(&(from, service, seq)) {
27 return true;
28 }
29
30 map.insert((from, service, seq), true);
31 queue.push_back((now_ms, (from, service, seq)));
32 if queue.len() > 10000 {
33 let (_ts, pair) = queue.pop_front().expect("queue should not empty");
34 map.remove(&pair);
35 }
36 false
37 }
38
39 fn set_ts(&self, now_ms: u64) {
40 self.now_ms.store(now_ms, std::sync::atomic::Ordering::Relaxed);
41 let mut map = self.map.lock();
42 let mut queue = self.queue.lock();
43
44 while let Some((time, key)) = queue.front() {
45 if now_ms >= *time + HISTORY_TIMEOUT_MS {
46 map.remove(key);
47 queue.pop_front();
48 } else {
49 break;
50 }
51 }
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use atm0s_sdn_router::shadow::ShadowRouterHistory;
58
59 use crate::history::HISTORY_TIMEOUT_MS;
60
61 use super::DataWorkerHistory;
62
63 #[test]
64 fn simple_work() {
65 let history = DataWorkerHistory::default();
66
67 assert_eq!(history.already_received_broadcast(Some(1), 1, 1), false);
68 assert_eq!(history.already_received_broadcast(Some(1), 1, 1), true);
69
70 history.set_ts(HISTORY_TIMEOUT_MS);
72 assert_eq!(history.already_received_broadcast(Some(1), 1, 1), false);
73 }
74}