1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use std::{net::SocketAddr, time::Duration};
use snarkos_metrics::wrapped_mpsc;
use snarkos_storage::Digest;
use tokio::time::Instant;
use crate::{Node, Peer, SyncInbound};
use anyhow::*;
pub struct SyncBase {
pub node: Node,
pub incoming: wrapped_mpsc::Receiver<SyncInbound>,
}
impl SyncBase {
pub fn new(node: Node) -> (Self, wrapped_mpsc::Sender<SyncInbound>) {
let (sender, receiver) = wrapped_mpsc::channel(snarkos_metrics::queues::SYNC_ITEMS, 256);
let new = Self {
node,
incoming: receiver,
};
(new, sender)
}
pub async fn find_sync_nodes(&self) -> Result<Vec<Peer>> {
let our_block_height = self.node.storage.canon().await?.block_height;
let mut interesting_peers = vec![];
for mut node in self.node.peer_book.connected_peers_snapshot().await {
let judge_bad = node.judge_bad();
if !judge_bad && node.block_height as usize > our_block_height + 1 {
interesting_peers.push(node);
}
}
interesting_peers.sort_by(|x, y| y.block_height.cmp(&x.block_height));
if let Some(i) = interesting_peers
.iter()
.position(|x| x.block_height as usize <= our_block_height + 10)
{
interesting_peers.truncate(i + 1);
}
if !interesting_peers.is_empty() {
info!("found {} interesting peers for sync", interesting_peers.len());
trace!("sync interesting peers = {:?}", interesting_peers);
}
Ok(interesting_peers)
}
pub async fn block_locator_hashes(node: &Node) -> Result<Vec<Digest>> {
let forks_of_interest = node
.storage
.scan_forks(snarkos_consensus::OLDEST_FORK_THRESHOLD as u32)
.await?;
trace!("sync found {} forks", forks_of_interest.len());
let blocks_of_interest: Vec<Digest> = forks_of_interest.into_iter().map(|(_canon, fork)| fork).collect();
let mut tips_of_blocks_of_interest: Vec<Digest> = Vec::with_capacity(blocks_of_interest.len());
for block in blocks_of_interest {
if tips_of_blocks_of_interest.len() > crate::MAX_BLOCK_SYNC_COUNT as usize {
debug!("reached limit of blocks of interest in sync block locator hashes");
break;
}
let mut fork_path = node.storage.longest_child_path(&block).await?;
if fork_path.len() < 2 {
continue;
}
tips_of_blocks_of_interest.push(fork_path.pop().unwrap());
}
let hashes = match node
.storage
.get_block_locator_hashes(tips_of_blocks_of_interest, snarkos_consensus::OLDEST_FORK_THRESHOLD)
.await
{
Ok(block_locator_hashes) => Ok(block_locator_hashes),
Err(e) => {
error!("Unable to get block locator hashes from storage: {:?}", e);
Err(e)
}
}?;
Ok(hashes)
}
pub async fn receive_messages<F: FnMut(SyncInbound) -> bool>(
&mut self,
timeout_sec: u64,
moving_timeout_sec: u64,
mut handler: F,
) {
let must_end = Instant::now() + Duration::from_secs(timeout_sec);
let timeout = tokio::time::sleep_until(must_end);
let extra_time = Duration::from_secs(moving_timeout_sec);
tokio::pin!(timeout);
loop {
tokio::select! {
biased;
_ = timeout.as_mut() => {
break;
}
msg = self.incoming.recv() => {
if msg.is_none() {
break;
}
if handler(msg.unwrap()) {
break;
}
let new_timeout = (Instant::now() + extra_time).min(must_end);
timeout.as_mut().reset(new_timeout);
},
}
}
}
pub async fn cancel_outstanding_syncs(&self, addresses: &[SocketAddr]) {
let mut future_set = vec![];
for addr in addresses {
if let Some(peer) = self.node.peer_book.get_peer_handle(*addr) {
future_set.push(async move {
peer.cancel_sync().await;
});
}
}
futures::future::join_all(future_set).await;
}
}