cruzbit/
block_queue.rs

1use std::collections::{HashMap, VecDeque};
2use std::net::SocketAddr;
3use std::sync::RwLock;
4use std::time::{Duration, SystemTime};
5
6use crate::block::BlockID;
7
8/// If a block has been in the queue for more than 2 minutes it can be re-added with a new peer responsible for its download.
9const MAX_QUEUE_WAIT: Duration = Duration::from_secs(2 * 60);
10
11/// A queue of blocks to download.
12pub struct BlockQueue {
13    inner: RwLock<BlockQueueInner>,
14}
15
16struct BlockQueueInner {
17    block_map: HashMap<BlockID, usize>,
18    block_queue: VecDeque<BlockQueueEntry>,
19}
20
21#[derive(Clone)]
22struct BlockQueueEntry {
23    id: BlockID,
24    who: SocketAddr,
25    when: SystemTime,
26}
27
28impl BlockQueue {
29    /// Returns a new instance of a BlockQueue
30    pub fn new() -> Self {
31        Self {
32            inner: RwLock::new(BlockQueueInner {
33                block_map: HashMap::new(),
34                block_queue: VecDeque::new(),
35            }),
36        }
37    }
38
39    /// Adds the block ID to the back of the queue and records the address of the peer who pushed it if it didn't exist in the queue.
40    /// If it did exist and MAX_QUEUE_WAIT has elapsed, the block is left in its position but the peer responsible for download is updated.
41    pub fn add(&self, id: &BlockID, who: &SocketAddr) -> bool {
42        let mut inner = self.inner.write().unwrap();
43
44        if let Some(&index) = inner.block_map.get(id) {
45            let entry = &mut inner.block_queue[index];
46            let elapsed = entry
47                .when
48                .elapsed()
49                .expect("couldn't get elapsed time from block queue entry");
50            if elapsed < MAX_QUEUE_WAIT {
51                // it's still pending download
52                return false;
53            }
54
55            // it's expired. signal that it can be tried again and leave it in place
56            entry.when = SystemTime::now();
57            // new peer owns its place in the queue
58            entry.who = *who;
59            return true;
60        }
61
62        // add to the back of the queue
63        let entry = BlockQueueEntry {
64            id: *id,
65            who: *who,
66            when: SystemTime::now(),
67        };
68        inner.block_queue.push_back(entry);
69        let index = inner.block_queue.len() - 1;
70        inner.block_map.insert(*id, index);
71
72        true
73    }
74
75    /// Removes the block ID from the queue only if the requester is who is currently responsible for its download.
76    pub fn remove(&self, id: &BlockID, who: &SocketAddr) -> bool {
77        let mut inner = self.inner.write().unwrap();
78
79        if let Some(&index) = inner.block_map.get(id) {
80            if inner.block_queue[index].who == *who {
81                inner.block_queue.remove(index);
82                inner.block_map.remove(id);
83                // update indices in map for all elements after the removed one
84                for (_, idx) in inner.block_map.iter_mut() {
85                    if *idx > index {
86                        *idx -= 1;
87                    }
88                }
89
90                return true;
91            }
92        }
93
94        false
95    }
96
97    /// Returns true if the block ID exists in the queue.
98    pub fn exists(&self, id: &BlockID) -> bool {
99        self.inner.read().unwrap().block_map.contains_key(id)
100    }
101
102    /// Returns the ID of the block at the front of the queue.
103    pub fn peek(&self) -> Option<BlockID> {
104        let inner = self.inner.read().unwrap();
105        inner.block_queue.front().map(|entry| entry.id)
106    }
107
108    /// Returns the length of the queue.
109    pub fn len(&self) -> usize {
110        self.inner.read().unwrap().block_queue.len()
111    }
112
113    /// Returns true if the queue has a length of 0.
114    pub fn is_empty(&self) -> bool {
115        self.len() == 0
116    }
117}
118
119impl Default for BlockQueue {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use std::str::FromStr;
129
130    fn make_test_id(n: u8) -> BlockID {
131        BlockID::from(&[n; 32][..])
132    }
133
134    fn make_test_addr(port: u16) -> SocketAddr {
135        SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap()
136    }
137
138    #[test]
139    fn test_add_remove() {
140        let queue = BlockQueue::new();
141        let id1 = make_test_id(1);
142        let id2 = make_test_id(2);
143        let addr1 = make_test_addr(8080);
144        let addr2 = make_test_addr(8081);
145
146        // add two blocks
147        assert!(queue.add(&id1, &addr1));
148        assert!(queue.add(&id2, &addr1));
149        assert_eq!(queue.len(), 2);
150
151        // try to add same block again (should fail)
152        assert!(!queue.add(&id1, &addr2));
153        assert_eq!(queue.len(), 2);
154
155        // remove first block
156        assert!(queue.remove(&id1, &addr1));
157        assert_eq!(queue.len(), 1);
158
159        // try to remove with wrong address (should fail)
160        assert!(!queue.remove(&id2, &addr2));
161        assert_eq!(queue.len(), 1);
162
163        // remove second block
164        assert!(queue.remove(&id2, &addr1));
165        assert_eq!(queue.len(), 0);
166    }
167
168    #[test]
169    fn test_peek_order() {
170        let queue = BlockQueue::new();
171        let id1 = make_test_id(1);
172        let id2 = make_test_id(2);
173        let id3 = make_test_id(3);
174        let addr = make_test_addr(8080);
175
176        // add three blocks
177        queue.add(&id1, &addr);
178        queue.add(&id2, &addr);
179        queue.add(&id3, &addr);
180        assert_eq!(queue.peek(), Some(id1));
181
182        // remove middle block
183        queue.remove(&id2, &addr);
184        assert_eq!(queue.peek(), Some(id1));
185
186        // remove first block
187        queue.remove(&id1, &addr);
188        assert_eq!(queue.peek(), Some(id3));
189    }
190}