1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use super::buckets::get_affinity;
use super::storage::StorageKeyId;
use super::DhtNode;
use crate::utils::*;
pub struct PeersIter {
key_id: StorageKeyId,
peer_ids: Vec<(u8, AdnlNodeIdShort)>,
index: usize,
}
impl PeersIter {
pub fn with_key_id(key_id: StorageKeyId) -> Self {
Self {
key_id,
peer_ids: Default::default(),
index: 0,
}
}
pub fn next(&mut self) -> Option<AdnlNodeIdShort> {
self.peer_ids.pop().map(|(_, peer_id)| peer_id)
}
pub fn fill(&mut self, dht: &DhtNode, batch_len: Option<usize>) {
while let Some(peer_id) = self.next_known_peer(dht) {
let affinity = get_affinity(&self.key_id, peer_id.as_slice());
let add = match (self.peer_ids.last(), batch_len) {
(None, _) | (_, None) => true,
(Some((top_affinity, _)), Some(batch_len)) => {
*top_affinity <= affinity || self.peer_ids.len() < batch_len
}
};
if add {
self.peer_ids.push((affinity, peer_id))
}
}
self.peer_ids
.sort_unstable_by_key(|(affinity, _)| *affinity);
if let Some(batch_len) = batch_len {
let mut iter = self.peer_ids.iter().rev();
if let Some((top_affinity, _)) = iter.next() {
let mut remaining_count = 0;
for (affinity, _) in iter {
if *affinity >= *top_affinity || remaining_count < batch_len {
remaining_count += 1;
} else {
break;
}
}
self.peer_ids.drain(..self.peer_ids.len() - remaining_count);
}
}
}
fn next_known_peer(&mut self, dht: &DhtNode) -> Option<AdnlNodeIdShort> {
loop {
let peer_id = dht.known_peers().get(self.index);
self.index += 1;
if let Some(peer) = &peer_id {
if dht.is_bad_peer(peer) {
continue;
}
}
break peer_id;
}
}
}