pezsc_network_sync/
blocks.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::LOG_TARGET;
20use log::trace;
21use pezsc_network_common::sync::message;
22use pezsc_network_types::PeerId;
23use pezsp_runtime::traits::{Block as BlockT, NumberFor, One};
24use std::{
25	cmp,
26	collections::{BTreeMap, HashMap},
27	ops::Range,
28};
29
30/// Block data with origin.
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct BlockData<B: BlockT> {
33	/// The Block Message from the wire
34	pub block: message::BlockData<B>,
35	/// The peer, we received this from
36	pub origin: Option<PeerId>,
37}
38
39#[derive(Debug)]
40enum BlockRangeState<B: BlockT> {
41	Downloading { len: NumberFor<B>, downloading: u32 },
42	Complete(Vec<BlockData<B>>),
43	Queued { len: NumberFor<B> },
44}
45
46impl<B: BlockT> BlockRangeState<B> {
47	pub fn len(&self) -> NumberFor<B> {
48		match *self {
49			Self::Downloading { len, .. } => len,
50			Self::Complete(ref blocks) => (blocks.len() as u32).into(),
51			Self::Queued { len } => len,
52		}
53	}
54}
55
56/// A collection of blocks being downloaded.
57#[derive(Default)]
58pub struct BlockCollection<B: BlockT> {
59	/// Downloaded blocks.
60	blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
61	peer_requests: HashMap<PeerId, NumberFor<B>>,
62	/// Block ranges downloaded and queued for import.
63	/// Maps start_hash => (start_num, end_num).
64	queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
65}
66
67impl<B: BlockT> BlockCollection<B> {
68	/// Create a new instance.
69	pub fn new() -> Self {
70		Self {
71			blocks: BTreeMap::new(),
72			peer_requests: HashMap::new(),
73			queued_blocks: HashMap::new(),
74		}
75	}
76
77	/// Clear everything.
78	pub fn clear(&mut self) {
79		self.blocks.clear();
80		self.peer_requests.clear();
81	}
82
83	/// Insert a set of blocks into collection.
84	pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
85		if blocks.is_empty() {
86			return;
87		}
88
89		match self.blocks.get(&start) {
90			Some(&BlockRangeState::Downloading { .. }) => {
91				trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
92			},
93			Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
94				trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
95				return;
96			},
97			_ => (),
98		}
99
100		self.blocks.insert(
101			start,
102			BlockRangeState::Complete(
103				blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
104			),
105		);
106	}
107
108	/// Returns a set of block hashes that require a header download. The returned set is marked as
109	/// being downloaded.
110	pub fn needed_blocks(
111		&mut self,
112		who: PeerId,
113		count: u32,
114		peer_best: NumberFor<B>,
115		common: NumberFor<B>,
116		max_parallel: u32,
117		max_ahead: u32,
118	) -> Option<Range<NumberFor<B>>> {
119		if peer_best <= common {
120			// Bail out early
121			return None;
122		}
123		// First block number that we need to download
124		let first_different = common + <NumberFor<B>>::one();
125		let count = (count as u32).into();
126		let (mut range, downloading) = {
127			// Iterate through the ranges in `self.blocks` looking for a range to download
128			let mut downloading_iter = self.blocks.iter().peekable();
129			let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
130			loop {
131				let next = downloading_iter.next();
132				break match (prev, next) {
133					// If we are already downloading this range, request it from `max_parallel`
134					// peers (`max_parallel = 5` by default).
135					// Do not request already downloading range from peers with common number above
136					// the range start.
137					(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
138						if downloading < max_parallel && *start >= first_different =>
139					{
140						(*start..*start + *len, downloading)
141					},
142					// If there is a gap between ranges requested, download this gap unless the peer
143					// has common number above the gap start
144					(Some((start, r)), Some((next_start, _)))
145						if *start + r.len() < *next_start
146							&& *start + r.len() >= first_different =>
147					{
148						(*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0)
149					},
150					// Download `count` blocks after the last range requested unless the peer
151					// has common number above this new range
152					(Some((start, r)), None) if *start + r.len() >= first_different => {
153						(*start + r.len()..*start + r.len() + count, 0)
154					},
155					// If there are no ranges currently requested, download `count` blocks after
156					// `common` number
157					(None, None) => (first_different..first_different + count, 0),
158					// If the first range starts above `common + 1`, download the gap at the start
159					(None, Some((start, _))) if *start > first_different => {
160						(first_different..cmp::min(first_different + count, *start), 0)
161					},
162					// Move on to the next range pair
163					_ => {
164						prev = next;
165						continue;
166					},
167				};
168			}
169		};
170		// crop to peers best
171		if range.start > peer_best {
172			trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
173			return None;
174		}
175		range.end = cmp::min(peer_best + One::one(), range.end);
176
177		if self
178			.blocks
179			.iter()
180			.next()
181			.map_or(false, |(n, _)| range.start > *n + max_ahead.into())
182		{
183			trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
184			return None;
185		}
186
187		self.peer_requests.insert(who, range.start);
188		self.blocks.insert(
189			range.start,
190			BlockRangeState::Downloading {
191				len: range.end - range.start,
192				downloading: downloading + 1,
193			},
194		);
195		if range.end <= range.start {
196			panic!(
197				"Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
198				range, count, peer_best, common, self.blocks
199			);
200		}
201		Some(range)
202	}
203
204	/// Get a valid chain of blocks ordered in descending order and ready for importing into
205	/// the blockchain.
206	/// `from` is the maximum block number for the start of the range that we are interested in.
207	/// The function will return empty Vec if the first block ready is higher than `from`.
208	/// For each returned block hash `clear_queued` must be called at some later stage.
209	pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
210		let mut ready = Vec::new();
211
212		let mut prev = from;
213		for (&start, range_data) in &mut self.blocks {
214			if start > prev {
215				break;
216			}
217			let len = match range_data {
218				BlockRangeState::Complete(blocks) => {
219					let len = (blocks.len() as u32).into();
220					prev = start + len;
221					if let Some(BlockData { block, .. }) = blocks.first() {
222						self.queued_blocks
223							.insert(block.hash, (start, start + (blocks.len() as u32).into()));
224					}
225					// Remove all elements from `blocks` and add them to `ready`
226					ready.append(blocks);
227					len
228				},
229				BlockRangeState::Queued { .. } => continue,
230				_ => break,
231			};
232			*range_data = BlockRangeState::Queued { len };
233		}
234		trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
235		ready
236	}
237
238	pub fn clear_queued(&mut self, hash: &B::Hash) {
239		if let Some((from, to)) = self.queued_blocks.remove(hash) {
240			let mut block_num = from;
241			while block_num < to {
242				self.blocks.remove(&block_num);
243				block_num += One::one();
244			}
245			trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
246		}
247	}
248
249	pub fn clear_peer_download(&mut self, who: &PeerId) {
250		if let Some(start) = self.peer_requests.remove(who) {
251			let remove = match self.blocks.get_mut(&start) {
252				Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
253					if *downloading > 1 =>
254				{
255					*downloading -= 1;
256					false
257				},
258				Some(&mut BlockRangeState::Downloading { .. }) => true,
259				_ => false,
260			};
261			if remove {
262				self.blocks.remove(&start);
263			}
264		}
265	}
266}
267
268#[cfg(test)]
269mod test {
270	use super::{BlockCollection, BlockData, BlockRangeState};
271	use pezsc_network_common::sync::message;
272	use pezsc_network_types::PeerId;
273	use pezsp_core::H256;
274	use pezsp_runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
275
276	type Block = RawBlock<TestXt<MockCallU64, ()>>;
277
278	fn is_empty(bc: &BlockCollection<Block>) -> bool {
279		bc.blocks.is_empty() && bc.peer_requests.is_empty()
280	}
281
282	fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
283		(0..n)
284			.map(|_| message::generic::BlockData {
285				hash: H256::random(),
286				header: None,
287				body: None,
288				indexed_body: None,
289				message_queue: None,
290				receipt: None,
291				justification: None,
292				justifications: None,
293			})
294			.collect()
295	}
296
297	#[test]
298	fn create_clear() {
299		let mut bc = BlockCollection::new();
300		assert!(is_empty(&bc));
301		bc.insert(1, generate_blocks(100), PeerId::random());
302		assert!(!is_empty(&bc));
303		bc.clear();
304		assert!(is_empty(&bc));
305	}
306
307	#[test]
308	fn insert_blocks() {
309		let mut bc = BlockCollection::new();
310		assert!(is_empty(&bc));
311		let peer0 = PeerId::random();
312		let peer1 = PeerId::random();
313		let peer2 = PeerId::random();
314
315		let blocks = generate_blocks(150);
316		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
317		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
318		assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
319
320		bc.clear_peer_download(&peer1);
321		bc.insert(41, blocks[41..81].to_vec(), peer1);
322		assert_eq!(bc.ready_blocks(1), vec![]);
323		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
324		bc.clear_peer_download(&peer0);
325		bc.insert(1, blocks[1..11].to_vec(), peer0);
326
327		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
328		assert_eq!(
329			bc.ready_blocks(1),
330			blocks[1..11]
331				.iter()
332				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
333				.collect::<Vec<_>>()
334		);
335
336		bc.clear_peer_download(&peer0);
337		bc.insert(11, blocks[11..41].to_vec(), peer0);
338
339		let ready = bc.ready_blocks(12);
340		assert_eq!(
341			ready[..30],
342			blocks[11..41]
343				.iter()
344				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
345				.collect::<Vec<_>>()[..]
346		);
347		assert_eq!(
348			ready[30..],
349			blocks[41..81]
350				.iter()
351				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
352				.collect::<Vec<_>>()[..]
353		);
354
355		bc.clear_peer_download(&peer2);
356		assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
357		bc.clear_peer_download(&peer2);
358		bc.insert(81, blocks[81..121].to_vec(), peer2);
359		bc.clear_peer_download(&peer1);
360		bc.insert(121, blocks[121..150].to_vec(), peer1);
361
362		assert_eq!(bc.ready_blocks(80), vec![]);
363		let ready = bc.ready_blocks(81);
364		assert_eq!(
365			ready[..40],
366			blocks[81..121]
367				.iter()
368				.map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
369				.collect::<Vec<_>>()[..]
370		);
371		assert_eq!(
372			ready[40..],
373			blocks[121..150]
374				.iter()
375				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
376				.collect::<Vec<_>>()[..]
377		);
378	}
379
380	#[test]
381	fn large_gap() {
382		let mut bc: BlockCollection<Block> = BlockCollection::new();
383		bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
384		let blocks = generate_blocks(10)
385			.into_iter()
386			.map(|b| BlockData { block: b, origin: None })
387			.collect();
388		bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
389
390		let peer0 = PeerId::random();
391		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
392		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); // too far ahead
393		assert_eq!(
394			bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
395			Some(100 + 128..100 + 128 + 128)
396		);
397	}
398
399	#[test]
400	fn no_duplicate_requests_on_fork() {
401		let mut bc = BlockCollection::new();
402		assert!(is_empty(&bc));
403		let peer = PeerId::random();
404
405		let blocks = generate_blocks(10);
406
407		// count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200
408		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
409
410		// got a response on the request for `40..45`
411		bc.clear_peer_download(&peer);
412		bc.insert(40, blocks[..5].to_vec(), peer);
413
414		// our "node" started on a fork, with its current best = 47, which is > common
415		let ready = bc.ready_blocks(48);
416		assert_eq!(
417			ready,
418			blocks[..5]
419				.iter()
420				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
421				.collect::<Vec<_>>()
422		);
423
424		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
425	}
426
427	#[test]
428	fn clear_queued_subsequent_ranges() {
429		let mut bc = BlockCollection::new();
430		assert!(is_empty(&bc));
431		let peer = PeerId::random();
432
433		let blocks = generate_blocks(10);
434
435		// Request 2 ranges
436		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
437		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
438
439		// got a response on the request for `40..50`
440		bc.clear_peer_download(&peer);
441		bc.insert(40, blocks.to_vec(), peer);
442
443		// request any blocks starting from 1000 or lower.
444		let ready = bc.ready_blocks(1000);
445		assert_eq!(
446			ready,
447			blocks
448				.iter()
449				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
450				.collect::<Vec<_>>()
451		);
452
453		bc.clear_queued(&blocks[0].hash);
454		assert!(bc.blocks.is_empty());
455		assert!(bc.queued_blocks.is_empty());
456	}
457
458	#[test]
459	fn downloaded_range_is_requested_from_max_parallel_peers() {
460		let mut bc = BlockCollection::new();
461		assert!(is_empty(&bc));
462
463		let count = 5;
464		// identical ranges requested from 2 peers
465		let max_parallel = 2;
466		let max_ahead = 200;
467
468		let peer1 = PeerId::random();
469		let peer2 = PeerId::random();
470		let peer3 = PeerId::random();
471
472		// common for all peers
473		let best = 100;
474		let common = 10;
475
476		assert_eq!(
477			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
478			Some(11..16)
479		);
480		assert_eq!(
481			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
482			Some(11..16)
483		);
484		assert_eq!(
485			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
486			Some(16..21)
487		);
488	}
489	#[test]
490	fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
491		// A peer connects with a common number falling behind our best number
492		// (either a fork or lagging behind).
493		// We request a range from this peer starting at its common number + 1.
494		// Even though we have less than `max_parallel` downloads, we do not request
495		// this range from peers with a common number above the start of this range.
496
497		let mut bc = BlockCollection::new();
498		assert!(is_empty(&bc));
499
500		let count = 5;
501		let max_parallel = 2;
502		let max_ahead = 200;
503
504		let peer1 = PeerId::random();
505		let peer1_best = 20;
506		let peer1_common = 10;
507
508		// `peer2` has first different above the start of the range downloaded from `peer1`
509		let peer2 = PeerId::random();
510		let peer2_best = 20;
511		let peer2_common = 11; // first_different = 12
512
513		assert_eq!(
514			bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
515			Some(11..16),
516		);
517		assert_eq!(
518			bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
519			Some(16..21),
520		);
521	}
522
523	#[test]
524	fn gap_above_common_number_requested() {
525		let mut bc = BlockCollection::new();
526		assert!(is_empty(&bc));
527
528		let count = 5;
529		let best = 30;
530		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
531		// set `max_parallel = 1`
532		let max_parallel = 1;
533		let max_ahead = 200;
534
535		let peer1 = PeerId::random();
536		let peer2 = PeerId::random();
537		let peer3 = PeerId::random();
538
539		let common = 10;
540		assert_eq!(
541			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
542			Some(11..16),
543		);
544		assert_eq!(
545			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
546			Some(16..21),
547		);
548		assert_eq!(
549			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
550			Some(21..26),
551		);
552
553		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
554		// also happen that 16..21 received first and got imported if our best is actually >= 15.
555		bc.clear_peer_download(&peer2);
556
557		// Some peer connects with common number below the gap. The gap is requested from it.
558		assert_eq!(
559			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
560			Some(16..21),
561		);
562	}
563
564	#[test]
565	fn gap_below_common_number_not_requested() {
566		let mut bc = BlockCollection::new();
567		assert!(is_empty(&bc));
568
569		let count = 5;
570		let best = 30;
571		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
572		// set `max_parallel = 1`
573		let max_parallel = 1;
574		let max_ahead = 200;
575
576		let peer1 = PeerId::random();
577		let peer2 = PeerId::random();
578		let peer3 = PeerId::random();
579
580		let common = 10;
581		assert_eq!(
582			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
583			Some(11..16),
584		);
585		assert_eq!(
586			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
587			Some(16..21),
588		);
589		assert_eq!(
590			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
591			Some(21..26),
592		);
593
594		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
595		// also happen that 16..21 received first and got imported if our best is actually >= 15.
596		bc.clear_peer_download(&peer2);
597
598		// Some peer connects with common number above the gap. The gap is not requested from it.
599		let common = 23;
600		assert_eq!(
601			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
602			Some(26..31), // not 16..21
603		);
604	}
605
606	#[test]
607	fn range_at_the_end_above_common_number_requested() {
608		let mut bc = BlockCollection::new();
609		assert!(is_empty(&bc));
610
611		let count = 5;
612		let best = 30;
613		let max_parallel = 1;
614		let max_ahead = 200;
615
616		let peer1 = PeerId::random();
617		let peer2 = PeerId::random();
618
619		let common = 10;
620		assert_eq!(
621			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
622			Some(11..16),
623		);
624		assert_eq!(
625			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
626			Some(16..21),
627		);
628	}
629
630	#[test]
631	fn range_at_the_end_below_common_number_not_requested() {
632		let mut bc = BlockCollection::new();
633		assert!(is_empty(&bc));
634
635		let count = 5;
636		let best = 30;
637		let max_parallel = 1;
638		let max_ahead = 200;
639
640		let peer1 = PeerId::random();
641		let peer2 = PeerId::random();
642
643		let common = 10;
644		assert_eq!(
645			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
646			Some(11..16),
647		);
648
649		let common = 20;
650		assert_eq!(
651			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
652			Some(21..26), // not 16..21
653		);
654	}
655}