1use crate::LOG_TARGET;
20use log::trace;
21use pezsc_network_common::sync::message;
22use pezsc_network_types::PeerId;
23use pezsp_runtime::traits::{Block as BlockT, NumberFor, One};
24use std::{
25 cmp,
26 collections::{BTreeMap, HashMap},
27 ops::Range,
28};
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct BlockData<B: BlockT> {
33 pub block: message::BlockData<B>,
35 pub origin: Option<PeerId>,
37}
38
39#[derive(Debug)]
40enum BlockRangeState<B: BlockT> {
41 Downloading { len: NumberFor<B>, downloading: u32 },
42 Complete(Vec<BlockData<B>>),
43 Queued { len: NumberFor<B> },
44}
45
46impl<B: BlockT> BlockRangeState<B> {
47 pub fn len(&self) -> NumberFor<B> {
48 match *self {
49 Self::Downloading { len, .. } => len,
50 Self::Complete(ref blocks) => (blocks.len() as u32).into(),
51 Self::Queued { len } => len,
52 }
53 }
54}
55
56#[derive(Default)]
58pub struct BlockCollection<B: BlockT> {
59 blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
61 peer_requests: HashMap<PeerId, NumberFor<B>>,
62 queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
65}
66
67impl<B: BlockT> BlockCollection<B> {
68 pub fn new() -> Self {
70 Self {
71 blocks: BTreeMap::new(),
72 peer_requests: HashMap::new(),
73 queued_blocks: HashMap::new(),
74 }
75 }
76
77 pub fn clear(&mut self) {
79 self.blocks.clear();
80 self.peer_requests.clear();
81 }
82
83 pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
85 if blocks.is_empty() {
86 return;
87 }
88
89 match self.blocks.get(&start) {
90 Some(&BlockRangeState::Downloading { .. }) => {
91 trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
92 },
93 Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
94 trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
95 return;
96 },
97 _ => (),
98 }
99
100 self.blocks.insert(
101 start,
102 BlockRangeState::Complete(
103 blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
104 ),
105 );
106 }
107
108 pub fn needed_blocks(
111 &mut self,
112 who: PeerId,
113 count: u32,
114 peer_best: NumberFor<B>,
115 common: NumberFor<B>,
116 max_parallel: u32,
117 max_ahead: u32,
118 ) -> Option<Range<NumberFor<B>>> {
119 if peer_best <= common {
120 return None;
122 }
123 let first_different = common + <NumberFor<B>>::one();
125 let count = (count as u32).into();
126 let (mut range, downloading) = {
127 let mut downloading_iter = self.blocks.iter().peekable();
129 let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
130 loop {
131 let next = downloading_iter.next();
132 break match (prev, next) {
133 (Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
138 if downloading < max_parallel && *start >= first_different =>
139 {
140 (*start..*start + *len, downloading)
141 },
142 (Some((start, r)), Some((next_start, _)))
145 if *start + r.len() < *next_start
146 && *start + r.len() >= first_different =>
147 {
148 (*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0)
149 },
150 (Some((start, r)), None) if *start + r.len() >= first_different => {
153 (*start + r.len()..*start + r.len() + count, 0)
154 },
155 (None, None) => (first_different..first_different + count, 0),
158 (None, Some((start, _))) if *start > first_different => {
160 (first_different..cmp::min(first_different + count, *start), 0)
161 },
162 _ => {
164 prev = next;
165 continue;
166 },
167 };
168 }
169 };
170 if range.start > peer_best {
172 trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
173 return None;
174 }
175 range.end = cmp::min(peer_best + One::one(), range.end);
176
177 if self
178 .blocks
179 .iter()
180 .next()
181 .map_or(false, |(n, _)| range.start > *n + max_ahead.into())
182 {
183 trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
184 return None;
185 }
186
187 self.peer_requests.insert(who, range.start);
188 self.blocks.insert(
189 range.start,
190 BlockRangeState::Downloading {
191 len: range.end - range.start,
192 downloading: downloading + 1,
193 },
194 );
195 if range.end <= range.start {
196 panic!(
197 "Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
198 range, count, peer_best, common, self.blocks
199 );
200 }
201 Some(range)
202 }
203
204 pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
210 let mut ready = Vec::new();
211
212 let mut prev = from;
213 for (&start, range_data) in &mut self.blocks {
214 if start > prev {
215 break;
216 }
217 let len = match range_data {
218 BlockRangeState::Complete(blocks) => {
219 let len = (blocks.len() as u32).into();
220 prev = start + len;
221 if let Some(BlockData { block, .. }) = blocks.first() {
222 self.queued_blocks
223 .insert(block.hash, (start, start + (blocks.len() as u32).into()));
224 }
225 ready.append(blocks);
227 len
228 },
229 BlockRangeState::Queued { .. } => continue,
230 _ => break,
231 };
232 *range_data = BlockRangeState::Queued { len };
233 }
234 trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
235 ready
236 }
237
238 pub fn clear_queued(&mut self, hash: &B::Hash) {
239 if let Some((from, to)) = self.queued_blocks.remove(hash) {
240 let mut block_num = from;
241 while block_num < to {
242 self.blocks.remove(&block_num);
243 block_num += One::one();
244 }
245 trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
246 }
247 }
248
249 pub fn clear_peer_download(&mut self, who: &PeerId) {
250 if let Some(start) = self.peer_requests.remove(who) {
251 let remove = match self.blocks.get_mut(&start) {
252 Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
253 if *downloading > 1 =>
254 {
255 *downloading -= 1;
256 false
257 },
258 Some(&mut BlockRangeState::Downloading { .. }) => true,
259 _ => false,
260 };
261 if remove {
262 self.blocks.remove(&start);
263 }
264 }
265 }
266}
267
268#[cfg(test)]
269mod test {
270 use super::{BlockCollection, BlockData, BlockRangeState};
271 use pezsc_network_common::sync::message;
272 use pezsc_network_types::PeerId;
273 use pezsp_core::H256;
274 use pezsp_runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
275
276 type Block = RawBlock<TestXt<MockCallU64, ()>>;
277
278 fn is_empty(bc: &BlockCollection<Block>) -> bool {
279 bc.blocks.is_empty() && bc.peer_requests.is_empty()
280 }
281
282 fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
283 (0..n)
284 .map(|_| message::generic::BlockData {
285 hash: H256::random(),
286 header: None,
287 body: None,
288 indexed_body: None,
289 message_queue: None,
290 receipt: None,
291 justification: None,
292 justifications: None,
293 })
294 .collect()
295 }
296
297 #[test]
298 fn create_clear() {
299 let mut bc = BlockCollection::new();
300 assert!(is_empty(&bc));
301 bc.insert(1, generate_blocks(100), PeerId::random());
302 assert!(!is_empty(&bc));
303 bc.clear();
304 assert!(is_empty(&bc));
305 }
306
307 #[test]
308 fn insert_blocks() {
309 let mut bc = BlockCollection::new();
310 assert!(is_empty(&bc));
311 let peer0 = PeerId::random();
312 let peer1 = PeerId::random();
313 let peer2 = PeerId::random();
314
315 let blocks = generate_blocks(150);
316 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
317 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
318 assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
319
320 bc.clear_peer_download(&peer1);
321 bc.insert(41, blocks[41..81].to_vec(), peer1);
322 assert_eq!(bc.ready_blocks(1), vec![]);
323 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
324 bc.clear_peer_download(&peer0);
325 bc.insert(1, blocks[1..11].to_vec(), peer0);
326
327 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
328 assert_eq!(
329 bc.ready_blocks(1),
330 blocks[1..11]
331 .iter()
332 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
333 .collect::<Vec<_>>()
334 );
335
336 bc.clear_peer_download(&peer0);
337 bc.insert(11, blocks[11..41].to_vec(), peer0);
338
339 let ready = bc.ready_blocks(12);
340 assert_eq!(
341 ready[..30],
342 blocks[11..41]
343 .iter()
344 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
345 .collect::<Vec<_>>()[..]
346 );
347 assert_eq!(
348 ready[30..],
349 blocks[41..81]
350 .iter()
351 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
352 .collect::<Vec<_>>()[..]
353 );
354
355 bc.clear_peer_download(&peer2);
356 assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
357 bc.clear_peer_download(&peer2);
358 bc.insert(81, blocks[81..121].to_vec(), peer2);
359 bc.clear_peer_download(&peer1);
360 bc.insert(121, blocks[121..150].to_vec(), peer1);
361
362 assert_eq!(bc.ready_blocks(80), vec![]);
363 let ready = bc.ready_blocks(81);
364 assert_eq!(
365 ready[..40],
366 blocks[81..121]
367 .iter()
368 .map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
369 .collect::<Vec<_>>()[..]
370 );
371 assert_eq!(
372 ready[40..],
373 blocks[121..150]
374 .iter()
375 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
376 .collect::<Vec<_>>()[..]
377 );
378 }
379
380 #[test]
381 fn large_gap() {
382 let mut bc: BlockCollection<Block> = BlockCollection::new();
383 bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
384 let blocks = generate_blocks(10)
385 .into_iter()
386 .map(|b| BlockData { block: b, origin: None })
387 .collect();
388 bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
389
390 let peer0 = PeerId::random();
391 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
392 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); assert_eq!(
394 bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
395 Some(100 + 128..100 + 128 + 128)
396 );
397 }
398
399 #[test]
400 fn no_duplicate_requests_on_fork() {
401 let mut bc = BlockCollection::new();
402 assert!(is_empty(&bc));
403 let peer = PeerId::random();
404
405 let blocks = generate_blocks(10);
406
407 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
409
410 bc.clear_peer_download(&peer);
412 bc.insert(40, blocks[..5].to_vec(), peer);
413
414 let ready = bc.ready_blocks(48);
416 assert_eq!(
417 ready,
418 blocks[..5]
419 .iter()
420 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
421 .collect::<Vec<_>>()
422 );
423
424 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
425 }
426
427 #[test]
428 fn clear_queued_subsequent_ranges() {
429 let mut bc = BlockCollection::new();
430 assert!(is_empty(&bc));
431 let peer = PeerId::random();
432
433 let blocks = generate_blocks(10);
434
435 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
437 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
438
439 bc.clear_peer_download(&peer);
441 bc.insert(40, blocks.to_vec(), peer);
442
443 let ready = bc.ready_blocks(1000);
445 assert_eq!(
446 ready,
447 blocks
448 .iter()
449 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
450 .collect::<Vec<_>>()
451 );
452
453 bc.clear_queued(&blocks[0].hash);
454 assert!(bc.blocks.is_empty());
455 assert!(bc.queued_blocks.is_empty());
456 }
457
458 #[test]
459 fn downloaded_range_is_requested_from_max_parallel_peers() {
460 let mut bc = BlockCollection::new();
461 assert!(is_empty(&bc));
462
463 let count = 5;
464 let max_parallel = 2;
466 let max_ahead = 200;
467
468 let peer1 = PeerId::random();
469 let peer2 = PeerId::random();
470 let peer3 = PeerId::random();
471
472 let best = 100;
474 let common = 10;
475
476 assert_eq!(
477 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
478 Some(11..16)
479 );
480 assert_eq!(
481 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
482 Some(11..16)
483 );
484 assert_eq!(
485 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
486 Some(16..21)
487 );
488 }
489 #[test]
490 fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
491 let mut bc = BlockCollection::new();
498 assert!(is_empty(&bc));
499
500 let count = 5;
501 let max_parallel = 2;
502 let max_ahead = 200;
503
504 let peer1 = PeerId::random();
505 let peer1_best = 20;
506 let peer1_common = 10;
507
508 let peer2 = PeerId::random();
510 let peer2_best = 20;
511 let peer2_common = 11; assert_eq!(
514 bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
515 Some(11..16),
516 );
517 assert_eq!(
518 bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
519 Some(16..21),
520 );
521 }
522
523 #[test]
524 fn gap_above_common_number_requested() {
525 let mut bc = BlockCollection::new();
526 assert!(is_empty(&bc));
527
528 let count = 5;
529 let best = 30;
530 let max_parallel = 1;
533 let max_ahead = 200;
534
535 let peer1 = PeerId::random();
536 let peer2 = PeerId::random();
537 let peer3 = PeerId::random();
538
539 let common = 10;
540 assert_eq!(
541 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
542 Some(11..16),
543 );
544 assert_eq!(
545 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
546 Some(16..21),
547 );
548 assert_eq!(
549 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
550 Some(21..26),
551 );
552
553 bc.clear_peer_download(&peer2);
556
557 assert_eq!(
559 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
560 Some(16..21),
561 );
562 }
563
564 #[test]
565 fn gap_below_common_number_not_requested() {
566 let mut bc = BlockCollection::new();
567 assert!(is_empty(&bc));
568
569 let count = 5;
570 let best = 30;
571 let max_parallel = 1;
574 let max_ahead = 200;
575
576 let peer1 = PeerId::random();
577 let peer2 = PeerId::random();
578 let peer3 = PeerId::random();
579
580 let common = 10;
581 assert_eq!(
582 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
583 Some(11..16),
584 );
585 assert_eq!(
586 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
587 Some(16..21),
588 );
589 assert_eq!(
590 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
591 Some(21..26),
592 );
593
594 bc.clear_peer_download(&peer2);
597
598 let common = 23;
600 assert_eq!(
601 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
602 Some(26..31), );
604 }
605
606 #[test]
607 fn range_at_the_end_above_common_number_requested() {
608 let mut bc = BlockCollection::new();
609 assert!(is_empty(&bc));
610
611 let count = 5;
612 let best = 30;
613 let max_parallel = 1;
614 let max_ahead = 200;
615
616 let peer1 = PeerId::random();
617 let peer2 = PeerId::random();
618
619 let common = 10;
620 assert_eq!(
621 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
622 Some(11..16),
623 );
624 assert_eq!(
625 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
626 Some(16..21),
627 );
628 }
629
630 #[test]
631 fn range_at_the_end_below_common_number_not_requested() {
632 let mut bc = BlockCollection::new();
633 assert!(is_empty(&bc));
634
635 let count = 5;
636 let best = 30;
637 let max_parallel = 1;
638 let max_ahead = 200;
639
640 let peer1 = PeerId::random();
641 let peer2 = PeerId::random();
642
643 let common = 10;
644 assert_eq!(
645 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
646 Some(11..16),
647 );
648
649 let common = 20;
650 assert_eq!(
651 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
652 Some(21..26), );
654 }
655}