1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) DUSK NETWORK. All rights reserved.
use std::cmp::Ordering;
use std::ops::Deref;
use node_data::message::payload::{GetResource, Inv, Quorum};
use node_data::message::Message;
use super::*;
use crate::chain::fallback;
pub(super) struct InSyncImpl<DB: database::DB, VM: vm::VMExecution, N: Network>
{
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
blacklisted_blocks: SharedHashSet,
presync: Option<PresyncInfo>,
}
impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
pub fn new(
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
blacklisted_blocks: SharedHashSet,
) -> Self {
Self {
acc,
network,
blacklisted_blocks,
presync: None,
}
}
/// performed when entering the state
pub async fn on_entering(&mut self, blk: &Block) -> anyhow::Result<()> {
let mut acc = self.acc.write().await;
let curr_h = acc.get_curr_height().await;
if blk.header().height == curr_h + 1 {
acc.accept_block(blk, true).await?;
}
info!(event = "entering in-sync", height = curr_h);
Ok(())
}
/// performed when exiting the state
pub async fn on_exiting(&mut self) {
self.presync = None
}
pub async fn on_quorum(
&mut self,
remote_quorum: &Quorum,
metadata: Option<&Metadata>,
) {
// If remote_blk.height > tip.height+1, we might be out of sync.
// Before switching to outOfSync mode and download missing blocks,
// we ensure that the peer has a valid successor of tip
if let Some(peer_addr) = metadata.map(|m| m.src_addr) {
// If there's no active presync process, we proceed with validation
if self.presync.is_none() {
let tip_height = self.acc.read().await.get_curr_height().await;
// We use the quorum's previous block, to be sure that network
// already have the full block available
let remote_height = remote_quorum.header.round - 1;
// Don't compare with `= tip + 1` because that's supposed to be
// handled by the InSync
if remote_height > tip_height + 1 {
// Initialize the presync process, storing metadata about
// the peer, the remote height, and our current tip height.
// This serves as a safeguard to avoid switching into
// out-of-sync mode without verifying the peer's
// information.
self.presync = Some(PresyncInfo::from_height(
peer_addr,
remote_height,
tip_height,
));
// Request the block immediately following our tip height
// from the peer to verify if the peer has a valid
// continuation of our chain.
// If the requested block (from the same peer) is accepted
// by the on_block_event before the presync timer expires,
// we will transition into out_of_sync mode.
self.request_block(tip_height + 1, peer_addr).await;
}
}
}
}
/// Return Some if there is the need to switch to OutOfSync mode.
/// This way the sync-up procedure to download all missing blocks from the
/// main chain will be triggered
pub async fn on_block_event(
&mut self,
remote_blk: &Block,
metadata: Option<Metadata>,
) -> anyhow::Result<Option<PresyncInfo>> {
let mut acc = self.acc.write().await;
let tip_header = acc.tip_header().await;
let tip_height = tip_header.height;
let remote_header = remote_blk.header();
let remote_height = remote_header.height;
// If we already accepted a block with the same height as remote_blk,
// check if remote_blk has higher priority. If so, we revert to its
// prev_block, and accept it as the new tip
if remote_height <= tip_height {
// Ensure the block is different from what we have in our chain
if remote_height == tip_height {
if remote_header.hash == tip_header.hash {
return Ok(None);
}
} else {
let blk_exists = acc
.db
.read()
.await
.view(|t| t.block_exists(&remote_header.hash))?;
if blk_exists {
return Ok(None);
}
}
// Ensure remote_blk is higher than the last finalized
// We do this check after the previous one because
// get_last_final_block if heavy
if remote_height
<= acc.get_last_final_block().await?.header().height
{
return Ok(None);
}
// Check if prev_blk is in our chain
// If not, remote_blk is on a fork
let prev_blk_exists = acc
.db
.read()
.await
.view(|t| t.block_exists(&remote_header.prev_block_hash))?;
if !prev_blk_exists {
warn!(
"received block from fork at height {remote_height}: {}",
to_str(&remote_header.hash)
);
return Ok(None);
}
// Fetch the chain block at the same height as remote_blk
let local_blk = if remote_height == tip_height {
acc.tip.read().await.inner().clone()
} else {
acc.db
.read()
.await
.view(|t| t.block_by_height(remote_height))?
.expect("local block should exist")
};
let local_header = local_blk.header();
let local_height = local_header.height;
match remote_header.iteration.cmp(&local_header.iteration) {
Ordering::Less => {
// If remote_blk.iteration < local_blk.iteration, then we
// fallback to prev_blk and accept remote_blk
info!(
event = "entering fallback",
height = local_height,
iter = local_header.iteration,
new_iter = remote_header.iteration,
);
// Retrieve prev_block state
let prev_state = acc
.db
.read()
.await
.view(|t| {
let res = t
.block_header(&remote_header.prev_block_hash)?
.map(|prev| prev.state_hash);
anyhow::Ok(res)
})?
.ok_or_else(|| {
anyhow::anyhow!("could not retrieve state_hash")
})?;
match fallback::WithContext::new(acc.deref())
.try_revert(
local_header,
remote_header,
RevertTarget::Commit(prev_state),
)
.await
{
Ok(_) => {
// Successfully fallbacked to prev_blk
counter!("dusk_fallback_count").increment(1);
// Blacklist the local_blk so we discard it if
// we receive it again
self.blacklisted_blocks
.write()
.await
.insert(local_header.hash);
// After reverting we can accept `remote_blk` as the
// new tip
acc.accept_block(remote_blk, true).await?;
return Ok(None);
}
Err(e) => {
error!(
event = "fallback failed",
height = local_height,
remote_height,
err = format!("{:?}", e)
);
return Ok(None);
}
}
}
Ordering::Greater => {
// If remote_blk.iteration > local_blk.iteration, we send
// the sender our local block. This
// behavior is intended to make the peer
// switch to our higher-priority block.
if let Some(meta) = metadata {
let remote_source = meta.src_addr;
debug!("sending our lower-iteration block at height {local_height} to {remote_source}");
let msg = Message::from(local_blk);
let net = self.network.read().await;
let send = net.send_to_peer(msg, remote_source);
if let Err(e) = send.await {
warn!("Unable to send_to_peer {e}")
};
}
}
Ordering::Equal => {
// If remote_blk and local_blk have the same iteration, it
// means two conflicting candidates have been generated
let local_hash = to_str(&local_header.hash);
let remote_hash = to_str(&remote_header.hash);
warn!("Double candidate detected. Local block: {local_hash}, remote block {remote_hash}");
}
}
return Ok(None);
}
// If remote_blk is a successor of our tip, we try to accept it
if remote_height == tip_height + 1 {
let finalized = acc.accept_block(remote_blk, true).await?;
// On first final block accepted while we're inSync, clear
// blacklisted blocks
if finalized {
self.blacklisted_blocks.write().await.clear();
}
// If the accepted block is the one requested to presync peer,
// switch to OutOfSync/Syncing mode
if let Some(metadata) = &metadata {
let same = self
.presync
.as_ref()
.map(|presync| {
metadata.src_addr == presync.peer_addr
&& remote_height == presync.start_height() + 1
})
.unwrap_or_default();
if same {
return Ok(self.presync.take());
}
}
return Ok(None);
}
// If remote_blk.height > tip.height+1, we might be out of sync.
// Before switching to outOfSync mode and download missing blocks,
// we ensure that the peer has a valid successor of tip
if let Some(peer_addr) = metadata.map(|m| m.src_addr) {
match self.presync.as_mut() {
// If there's no active presync process, we proceed with
// validation
None => {
self.presync = Some(PresyncInfo::from_block(
peer_addr,
remote_blk.clone(),
tip_height,
));
self.request_block(tip_height + 1, peer_addr).await;
}
// If there's an active presync process, we add the received
// block to the pool so to process it when the sync procedure
// will start
Some(pre) => {
if pre.peer_addr == peer_addr {
pre.pool.push(remote_blk.clone())
}
}
}
}
Ok(None)
}
/// Requests a block by height from a `peer_addr`
async fn request_block(&self, height: u64, peer_addr: SocketAddr) {
let network = self.network.read().await;
let mut inv = Inv::new(1);
inv.add_block_from_height(height);
let this_peer = *network.public_addr();
let req = GetResource::new(inv, Some(this_peer), u64::MAX, 1);
debug!(event = "request block by height", ?req, ?peer_addr);
if let Err(err) = network.send_to_peer(req.into(), peer_addr).await {
warn!("could not request block {err}")
}
}
pub async fn on_heartbeat(&mut self) -> anyhow::Result<bool> {
if let Some(pre_sync) = &mut self.presync {
if pre_sync.expiry <= Instant::now() {
// Reset presync if it timed out
self.presync = None;
}
}
Ok(false)
}
}