1use std::collections::{HashMap, VecDeque};
2use std::net::SocketAddr;
3use std::sync::RwLock;
4use std::time::{Duration, SystemTime};
5
6use crate::block::BlockID;
7
8const MAX_QUEUE_WAIT: Duration = Duration::from_secs(2 * 60);
10
11pub struct BlockQueue {
13 inner: RwLock<BlockQueueInner>,
14}
15
16struct BlockQueueInner {
17 block_map: HashMap<BlockID, usize>,
18 block_queue: VecDeque<BlockQueueEntry>,
19}
20
21#[derive(Clone)]
22struct BlockQueueEntry {
23 id: BlockID,
24 who: SocketAddr,
25 when: SystemTime,
26}
27
28impl BlockQueue {
29 pub fn new() -> Self {
31 Self {
32 inner: RwLock::new(BlockQueueInner {
33 block_map: HashMap::new(),
34 block_queue: VecDeque::new(),
35 }),
36 }
37 }
38
39 pub fn add(&self, id: &BlockID, who: &SocketAddr) -> bool {
42 let mut inner = self.inner.write().unwrap();
43
44 if let Some(&index) = inner.block_map.get(id) {
45 let entry = &mut inner.block_queue[index];
46 let elapsed = entry
47 .when
48 .elapsed()
49 .expect("couldn't get elapsed time from block queue entry");
50 if elapsed < MAX_QUEUE_WAIT {
51 return false;
53 }
54
55 entry.when = SystemTime::now();
57 entry.who = *who;
59 return true;
60 }
61
62 let entry = BlockQueueEntry {
64 id: *id,
65 who: *who,
66 when: SystemTime::now(),
67 };
68 inner.block_queue.push_back(entry);
69 let index = inner.block_queue.len() - 1;
70 inner.block_map.insert(*id, index);
71
72 true
73 }
74
75 pub fn remove(&self, id: &BlockID, who: &SocketAddr) -> bool {
77 let mut inner = self.inner.write().unwrap();
78
79 if let Some(&index) = inner.block_map.get(id) {
80 if inner.block_queue[index].who == *who {
81 inner.block_queue.remove(index);
82 inner.block_map.remove(id);
83 for (_, idx) in inner.block_map.iter_mut() {
85 if *idx > index {
86 *idx -= 1;
87 }
88 }
89
90 return true;
91 }
92 }
93
94 false
95 }
96
97 pub fn exists(&self, id: &BlockID) -> bool {
99 self.inner.read().unwrap().block_map.contains_key(id)
100 }
101
102 pub fn peek(&self) -> Option<BlockID> {
104 let inner = self.inner.read().unwrap();
105 inner.block_queue.front().map(|entry| entry.id)
106 }
107
108 pub fn len(&self) -> usize {
110 self.inner.read().unwrap().block_queue.len()
111 }
112
113 pub fn is_empty(&self) -> bool {
115 self.len() == 0
116 }
117}
118
119impl Default for BlockQueue {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use std::str::FromStr;
129
130 fn make_test_id(n: u8) -> BlockID {
131 BlockID::from(&[n; 32][..])
132 }
133
134 fn make_test_addr(port: u16) -> SocketAddr {
135 SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap()
136 }
137
138 #[test]
139 fn test_add_remove() {
140 let queue = BlockQueue::new();
141 let id1 = make_test_id(1);
142 let id2 = make_test_id(2);
143 let addr1 = make_test_addr(8080);
144 let addr2 = make_test_addr(8081);
145
146 assert!(queue.add(&id1, &addr1));
148 assert!(queue.add(&id2, &addr1));
149 assert_eq!(queue.len(), 2);
150
151 assert!(!queue.add(&id1, &addr2));
153 assert_eq!(queue.len(), 2);
154
155 assert!(queue.remove(&id1, &addr1));
157 assert_eq!(queue.len(), 1);
158
159 assert!(!queue.remove(&id2, &addr2));
161 assert_eq!(queue.len(), 1);
162
163 assert!(queue.remove(&id2, &addr1));
165 assert_eq!(queue.len(), 0);
166 }
167
168 #[test]
169 fn test_peek_order() {
170 let queue = BlockQueue::new();
171 let id1 = make_test_id(1);
172 let id2 = make_test_id(2);
173 let id3 = make_test_id(3);
174 let addr = make_test_addr(8080);
175
176 queue.add(&id1, &addr);
178 queue.add(&id2, &addr);
179 queue.add(&id3, &addr);
180 assert_eq!(queue.peek(), Some(id1));
181
182 queue.remove(&id2, &addr);
184 assert_eq!(queue.peek(), Some(id1));
185
186 queue.remove(&id1, &addr);
188 assert_eq!(queue.peek(), Some(id3));
189 }
190}