Skip to main content

soil_network/sync/
blocks.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::LOG_TARGET;
8use log::trace;
9use soil_network::common::sync::message;
10use soil_network::types::PeerId;
11use std::{
12	cmp,
13	collections::{BTreeMap, HashMap},
14	ops::Range,
15};
16use subsoil::runtime::traits::{Block as BlockT, NumberFor, One};
17
18/// Block data with origin.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct BlockData<B: BlockT> {
21	/// The Block Message from the wire
22	pub block: message::BlockData<B>,
23	/// The peer, we received this from
24	pub origin: Option<PeerId>,
25}
26
27#[derive(Debug)]
28enum BlockRangeState<B: BlockT> {
29	Downloading { len: NumberFor<B>, downloading: u32 },
30	Complete(Vec<BlockData<B>>),
31	Queued { len: NumberFor<B> },
32}
33
34impl<B: BlockT> BlockRangeState<B> {
35	pub fn len(&self) -> NumberFor<B> {
36		match *self {
37			Self::Downloading { len, .. } => len,
38			Self::Complete(ref blocks) => (blocks.len() as u32).into(),
39			Self::Queued { len } => len,
40		}
41	}
42}
43
44/// A collection of blocks being downloaded.
45#[derive(Default)]
46pub struct BlockCollection<B: BlockT> {
47	/// Downloaded blocks.
48	blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
49	peer_requests: HashMap<PeerId, NumberFor<B>>,
50	/// Block ranges downloaded and queued for import.
51	/// Maps start_hash => (start_num, end_num).
52	queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
53}
54
55impl<B: BlockT> BlockCollection<B> {
56	/// Create a new instance.
57	pub fn new() -> Self {
58		Self {
59			blocks: BTreeMap::new(),
60			peer_requests: HashMap::new(),
61			queued_blocks: HashMap::new(),
62		}
63	}
64
65	/// Clear everything.
66	pub fn clear(&mut self) {
67		self.blocks.clear();
68		self.peer_requests.clear();
69	}
70
71	/// Insert a set of blocks into collection.
72	pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
73		if blocks.is_empty() {
74			return;
75		}
76
77		match self.blocks.get(&start) {
78			Some(&BlockRangeState::Downloading { .. }) => {
79				trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
80			},
81			Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
82				trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
83				return;
84			},
85			_ => (),
86		}
87
88		self.blocks.insert(
89			start,
90			BlockRangeState::Complete(
91				blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
92			),
93		);
94	}
95
96	/// Returns a set of block hashes that require a header download. The returned set is marked as
97	/// being downloaded.
98	pub fn needed_blocks(
99		&mut self,
100		who: PeerId,
101		count: u32,
102		peer_best: NumberFor<B>,
103		common: NumberFor<B>,
104		max_parallel: u32,
105		max_ahead: u32,
106	) -> Option<Range<NumberFor<B>>> {
107		if peer_best <= common {
108			// Bail out early
109			return None;
110		}
111		// First block number that we need to download
112		let first_different = common + <NumberFor<B>>::one();
113		let count = (count as u32).into();
114		let (mut range, downloading) = {
115			// Iterate through the ranges in `self.blocks` looking for a range to download
116			let mut downloading_iter = self.blocks.iter().peekable();
117			let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
118			loop {
119				let next = downloading_iter.next();
120				break match (prev, next) {
121					// If we are already downloading this range, request it from `max_parallel`
122					// peers (`max_parallel = 5` by default).
123					// Do not request already downloading range from peers with common number above
124					// the range start.
125					(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
126						if downloading < max_parallel && *start >= first_different =>
127					{
128						(*start..*start + *len, downloading)
129					},
130					// If there is a gap between ranges requested, download this gap unless the peer
131					// has common number above the gap start
132					(Some((start, r)), Some((next_start, _)))
133						if *start + r.len() < *next_start
134							&& *start + r.len() >= first_different =>
135					{
136						(*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0)
137					},
138					// Download `count` blocks after the last range requested unless the peer
139					// has common number above this new range
140					(Some((start, r)), None) if *start + r.len() >= first_different => {
141						(*start + r.len()..*start + r.len() + count, 0)
142					},
143					// If there are no ranges currently requested, download `count` blocks after
144					// `common` number
145					(None, None) => (first_different..first_different + count, 0),
146					// If the first range starts above `common + 1`, download the gap at the start
147					(None, Some((start, _))) if *start > first_different => {
148						(first_different..cmp::min(first_different + count, *start), 0)
149					},
150					// Move on to the next range pair
151					_ => {
152						prev = next;
153						continue;
154					},
155				};
156			}
157		};
158		// crop to peers best
159		if range.start > peer_best {
160			trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
161			return None;
162		}
163		range.end = cmp::min(peer_best + One::one(), range.end);
164
165		if self
166			.blocks
167			.iter()
168			.next()
169			.map_or(false, |(n, _)| range.start > *n + max_ahead.into())
170		{
171			trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
172			return None;
173		}
174
175		self.peer_requests.insert(who, range.start);
176		self.blocks.insert(
177			range.start,
178			BlockRangeState::Downloading {
179				len: range.end - range.start,
180				downloading: downloading + 1,
181			},
182		);
183		if range.end <= range.start {
184			panic!(
185				"Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
186				range, count, peer_best, common, self.blocks
187			);
188		}
189		Some(range)
190	}
191
192	/// Get a valid chain of blocks ordered in descending order and ready for importing into
193	/// the blockchain.
194	/// `from` is the maximum block number for the start of the range that we are interested in.
195	/// The function will return empty Vec if the first block ready is higher than `from`.
196	/// For each returned block hash `clear_queued` must be called at some later stage.
197	pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
198		let mut ready = Vec::new();
199
200		let mut prev = from;
201		for (&start, range_data) in &mut self.blocks {
202			if start > prev {
203				break;
204			}
205			let len = match range_data {
206				BlockRangeState::Complete(blocks) => {
207					let len = (blocks.len() as u32).into();
208					prev = start + len;
209					if let Some(BlockData { block, .. }) = blocks.first() {
210						self.queued_blocks
211							.insert(block.hash, (start, start + (blocks.len() as u32).into()));
212					}
213					// Remove all elements from `blocks` and add them to `ready`
214					ready.append(blocks);
215					len
216				},
217				BlockRangeState::Queued { .. } => continue,
218				_ => break,
219			};
220			*range_data = BlockRangeState::Queued { len };
221		}
222		trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
223		ready
224	}
225
226	pub fn clear_queued(&mut self, hash: &B::Hash) {
227		if let Some((from, to)) = self.queued_blocks.remove(hash) {
228			let mut block_num = from;
229			while block_num < to {
230				self.blocks.remove(&block_num);
231				block_num += One::one();
232			}
233			trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
234		}
235	}
236
237	pub fn clear_peer_download(&mut self, who: &PeerId) {
238		if let Some(start) = self.peer_requests.remove(who) {
239			let remove = match self.blocks.get_mut(&start) {
240				Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
241					if *downloading > 1 =>
242				{
243					*downloading -= 1;
244					false
245				},
246				Some(&mut BlockRangeState::Downloading { .. }) => true,
247				_ => false,
248			};
249			if remove {
250				self.blocks.remove(&start);
251			}
252		}
253	}
254}
255
256#[cfg(test)]
257mod test {
258	use super::{BlockCollection, BlockData, BlockRangeState};
259	use soil_network::common::sync::message;
260	use soil_network::types::PeerId;
261	use subsoil::core::H256;
262	use subsoil::runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
263
264	type Block = RawBlock<TestXt<MockCallU64, ()>>;
265
266	fn is_empty(bc: &BlockCollection<Block>) -> bool {
267		bc.blocks.is_empty() && bc.peer_requests.is_empty()
268	}
269
270	fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
271		(0..n)
272			.map(|_| message::generic::BlockData {
273				hash: H256::random(),
274				header: None,
275				body: None,
276				indexed_body: None,
277				message_queue: None,
278				receipt: None,
279				justification: None,
280				justifications: None,
281			})
282			.collect()
283	}
284
285	#[test]
286	fn create_clear() {
287		let mut bc = BlockCollection::new();
288		assert!(is_empty(&bc));
289		bc.insert(1, generate_blocks(100), PeerId::random());
290		assert!(!is_empty(&bc));
291		bc.clear();
292		assert!(is_empty(&bc));
293	}
294
295	#[test]
296	fn insert_blocks() {
297		let mut bc = BlockCollection::new();
298		assert!(is_empty(&bc));
299		let peer0 = PeerId::random();
300		let peer1 = PeerId::random();
301		let peer2 = PeerId::random();
302
303		let blocks = generate_blocks(150);
304		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
305		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
306		assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
307
308		bc.clear_peer_download(&peer1);
309		bc.insert(41, blocks[41..81].to_vec(), peer1);
310		assert_eq!(bc.ready_blocks(1), vec![]);
311		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
312		bc.clear_peer_download(&peer0);
313		bc.insert(1, blocks[1..11].to_vec(), peer0);
314
315		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
316		assert_eq!(
317			bc.ready_blocks(1),
318			blocks[1..11]
319				.iter()
320				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
321				.collect::<Vec<_>>()
322		);
323
324		bc.clear_peer_download(&peer0);
325		bc.insert(11, blocks[11..41].to_vec(), peer0);
326
327		let ready = bc.ready_blocks(12);
328		assert_eq!(
329			ready[..30],
330			blocks[11..41]
331				.iter()
332				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
333				.collect::<Vec<_>>()[..]
334		);
335		assert_eq!(
336			ready[30..],
337			blocks[41..81]
338				.iter()
339				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
340				.collect::<Vec<_>>()[..]
341		);
342
343		bc.clear_peer_download(&peer2);
344		assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
345		bc.clear_peer_download(&peer2);
346		bc.insert(81, blocks[81..121].to_vec(), peer2);
347		bc.clear_peer_download(&peer1);
348		bc.insert(121, blocks[121..150].to_vec(), peer1);
349
350		assert_eq!(bc.ready_blocks(80), vec![]);
351		let ready = bc.ready_blocks(81);
352		assert_eq!(
353			ready[..40],
354			blocks[81..121]
355				.iter()
356				.map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
357				.collect::<Vec<_>>()[..]
358		);
359		assert_eq!(
360			ready[40..],
361			blocks[121..150]
362				.iter()
363				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
364				.collect::<Vec<_>>()[..]
365		);
366	}
367
368	#[test]
369	fn large_gap() {
370		let mut bc: BlockCollection<Block> = BlockCollection::new();
371		bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
372		let blocks = generate_blocks(10)
373			.into_iter()
374			.map(|b| BlockData { block: b, origin: None })
375			.collect();
376		bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
377
378		let peer0 = PeerId::random();
379		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
380		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); // too far ahead
381		assert_eq!(
382			bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
383			Some(100 + 128..100 + 128 + 128)
384		);
385	}
386
387	#[test]
388	fn no_duplicate_requests_on_fork() {
389		let mut bc = BlockCollection::new();
390		assert!(is_empty(&bc));
391		let peer = PeerId::random();
392
393		let blocks = generate_blocks(10);
394
395		// count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200
396		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
397
398		// got a response on the request for `40..45`
399		bc.clear_peer_download(&peer);
400		bc.insert(40, blocks[..5].to_vec(), peer);
401
402		// our "node" started on a fork, with its current best = 47, which is > common
403		let ready = bc.ready_blocks(48);
404		assert_eq!(
405			ready,
406			blocks[..5]
407				.iter()
408				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
409				.collect::<Vec<_>>()
410		);
411
412		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
413	}
414
415	#[test]
416	fn clear_queued_subsequent_ranges() {
417		let mut bc = BlockCollection::new();
418		assert!(is_empty(&bc));
419		let peer = PeerId::random();
420
421		let blocks = generate_blocks(10);
422
423		// Request 2 ranges
424		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
425		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
426
427		// got a response on the request for `40..50`
428		bc.clear_peer_download(&peer);
429		bc.insert(40, blocks.to_vec(), peer);
430
431		// request any blocks starting from 1000 or lower.
432		let ready = bc.ready_blocks(1000);
433		assert_eq!(
434			ready,
435			blocks
436				.iter()
437				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
438				.collect::<Vec<_>>()
439		);
440
441		bc.clear_queued(&blocks[0].hash);
442		assert!(bc.blocks.is_empty());
443		assert!(bc.queued_blocks.is_empty());
444	}
445
446	#[test]
447	fn downloaded_range_is_requested_from_max_parallel_peers() {
448		let mut bc = BlockCollection::new();
449		assert!(is_empty(&bc));
450
451		let count = 5;
452		// identical ranges requested from 2 peers
453		let max_parallel = 2;
454		let max_ahead = 200;
455
456		let peer1 = PeerId::random();
457		let peer2 = PeerId::random();
458		let peer3 = PeerId::random();
459
460		// common for all peers
461		let best = 100;
462		let common = 10;
463
464		assert_eq!(
465			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
466			Some(11..16)
467		);
468		assert_eq!(
469			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
470			Some(11..16)
471		);
472		assert_eq!(
473			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
474			Some(16..21)
475		);
476	}
477	#[test]
478	fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
479		// A peer connects with a common number falling behind our best number
480		// (either a fork or lagging behind).
481		// We request a range from this peer starting at its common number + 1.
482		// Even though we have less than `max_parallel` downloads, we do not request
483		// this range from peers with a common number above the start of this range.
484
485		let mut bc = BlockCollection::new();
486		assert!(is_empty(&bc));
487
488		let count = 5;
489		let max_parallel = 2;
490		let max_ahead = 200;
491
492		let peer1 = PeerId::random();
493		let peer1_best = 20;
494		let peer1_common = 10;
495
496		// `peer2` has first different above the start of the range downloaded from `peer1`
497		let peer2 = PeerId::random();
498		let peer2_best = 20;
499		let peer2_common = 11; // first_different = 12
500
501		assert_eq!(
502			bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
503			Some(11..16),
504		);
505		assert_eq!(
506			bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
507			Some(16..21),
508		);
509	}
510
511	#[test]
512	fn gap_above_common_number_requested() {
513		let mut bc = BlockCollection::new();
514		assert!(is_empty(&bc));
515
516		let count = 5;
517		let best = 30;
518		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
519		// set `max_parallel = 1`
520		let max_parallel = 1;
521		let max_ahead = 200;
522
523		let peer1 = PeerId::random();
524		let peer2 = PeerId::random();
525		let peer3 = PeerId::random();
526
527		let common = 10;
528		assert_eq!(
529			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
530			Some(11..16),
531		);
532		assert_eq!(
533			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
534			Some(16..21),
535		);
536		assert_eq!(
537			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
538			Some(21..26),
539		);
540
541		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
542		// also happen that 16..21 received first and got imported if our best is actually >= 15.
543		bc.clear_peer_download(&peer2);
544
545		// Some peer connects with common number below the gap. The gap is requested from it.
546		assert_eq!(
547			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
548			Some(16..21),
549		);
550	}
551
552	#[test]
553	fn gap_below_common_number_not_requested() {
554		let mut bc = BlockCollection::new();
555		assert!(is_empty(&bc));
556
557		let count = 5;
558		let best = 30;
559		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
560		// set `max_parallel = 1`
561		let max_parallel = 1;
562		let max_ahead = 200;
563
564		let peer1 = PeerId::random();
565		let peer2 = PeerId::random();
566		let peer3 = PeerId::random();
567
568		let common = 10;
569		assert_eq!(
570			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
571			Some(11..16),
572		);
573		assert_eq!(
574			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
575			Some(16..21),
576		);
577		assert_eq!(
578			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
579			Some(21..26),
580		);
581
582		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
583		// also happen that 16..21 received first and got imported if our best is actually >= 15.
584		bc.clear_peer_download(&peer2);
585
586		// Some peer connects with common number above the gap. The gap is not requested from it.
587		let common = 23;
588		assert_eq!(
589			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
590			Some(26..31), // not 16..21
591		);
592	}
593
594	#[test]
595	fn range_at_the_end_above_common_number_requested() {
596		let mut bc = BlockCollection::new();
597		assert!(is_empty(&bc));
598
599		let count = 5;
600		let best = 30;
601		let max_parallel = 1;
602		let max_ahead = 200;
603
604		let peer1 = PeerId::random();
605		let peer2 = PeerId::random();
606
607		let common = 10;
608		assert_eq!(
609			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
610			Some(11..16),
611		);
612		assert_eq!(
613			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
614			Some(16..21),
615		);
616	}
617
618	#[test]
619	fn range_at_the_end_below_common_number_not_requested() {
620		let mut bc = BlockCollection::new();
621		assert!(is_empty(&bc));
622
623		let count = 5;
624		let best = 30;
625		let max_parallel = 1;
626		let max_ahead = 200;
627
628		let peer1 = PeerId::random();
629		let peer2 = PeerId::random();
630
631		let common = 10;
632		assert_eq!(
633			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
634			Some(11..16),
635		);
636
637		let common = 20;
638		assert_eq!(
639			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
640			Some(21..26), // not 16..21
641		);
642	}
643}